diff --git a/Cargo.lock b/Cargo.lock index 1f02a8e7..6f59b89f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8312,8 +8312,12 @@ dependencies = [ "http 1.3.1", "hyper 1.6.0", "lazy_static", + "rand 0.9.1", "rustfs-utils", + "s3s", + "serde", "serde_urlencoded", + "tempfile", "time", "tracing", ] diff --git a/crates/common/src/last_minute.rs b/crates/common/src/last_minute.rs index b5b22eb3..7ab189bc 100644 --- a/crates/common/src/last_minute.rs +++ b/crates/common/src/last_minute.rs @@ -568,14 +568,11 @@ mod tests { let mut latency = LastMinuteLatency::default(); // Add data at time 1000 - latency.add_all( - 1000, - &AccElem { - total: 10, - size: 0, - n: 1, - }, - ); + latency.add_all(1000, &AccElem { + total: 10, + size: 0, + n: 1, + }); // Forward to time 1030 (30 seconds later) latency.forward_to(1030); diff --git a/crates/ecstore/src/bucket/metadata_sys.rs b/crates/ecstore/src/bucket/metadata_sys.rs index 791134da..8a86f365 100644 --- a/crates/ecstore/src/bucket/metadata_sys.rs +++ b/crates/ecstore/src/bucket/metadata_sys.rs @@ -236,13 +236,10 @@ impl BucketMetadataSys { futures.push(async move { sleep(Duration::from_millis(30)).await; let _ = api - .heal_bucket( - &bucket, - &HealOpts { - recreate: true, - ..Default::default() - }, - ) + .heal_bucket(&bucket, &HealOpts { + recreate: true, + ..Default::default() + }) .await; load_bucket_metadata(self.api.clone(), bucket.as_str()).await }); diff --git a/crates/ecstore/src/client/api_bucket_policy.rs b/crates/ecstore/src/client/api_bucket_policy.rs index 8ed9c606..7b9dec29 100644 --- a/crates/ecstore/src/client/api_bucket_policy.rs +++ b/crates/ecstore/src/client/api_bucket_policy.rs @@ -74,26 +74,23 @@ impl TransitionClient { url_values.insert("policy".to_string(), "".to_string()); let resp = self - .execute_method( - http::Method::DELETE, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - query_values: url_values, - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - object_name: "".to_string(), - custom_header: HeaderMap::new(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::DELETE, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + query_values: url_values, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + object_name: "".to_string(), + custom_header: HeaderMap::new(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; //defer closeResponse(resp) @@ -114,26 +111,23 @@ impl TransitionClient { url_values.insert("policy".to_string(), "".to_string()); let resp = self - .execute_method( - http::Method::GET, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - query_values: url_values, - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - object_name: "".to_string(), - custom_header: HeaderMap::new(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::GET, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + query_values: url_values, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + object_name: "".to_string(), + custom_header: HeaderMap::new(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; let policy = String::from_utf8_lossy(&resp.body().bytes().expect("err").to_vec()).to_string(); diff --git a/crates/ecstore/src/client/api_get_object.rs b/crates/ecstore/src/client/api_get_object.rs index 80759b2c..cce4b171 100644 --- a/crates/ecstore/src/client/api_get_object.rs +++ b/crates/ecstore/src/client/api_get_object.rs @@ -43,26 +43,23 @@ impl TransitionClient { opts: &GetObjectOptions, ) -> Result<(ObjectInfo, HeaderMap, ReadCloser), std::io::Error> { let resp = self - .execute_method( - http::Method::GET, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - query_values: opts.to_query_values(), - custom_header: opts.header(), - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::GET, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: opts.to_query_values(), + custom_header: opts.header(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; let resp = &resp; diff --git a/crates/ecstore/src/client/api_get_object_acl.rs b/crates/ecstore/src/client/api_get_object_acl.rs index 1a9167a3..898e8aab 100644 --- a/crates/ecstore/src/client/api_get_object_acl.rs +++ b/crates/ecstore/src/client/api_get_object_acl.rs @@ -21,40 +21,40 @@ use bytes::Bytes; use http::{HeaderMap, HeaderValue}; use s3s::dto::Owner; -use std::io::Cursor; use std::collections::HashMap; +use std::io::Cursor; use tokio::io::BufReader; -use rustfs_utils::EMPTY_STRING_SHA256_HASH; use crate::client::{ api_error_response::{err_invalid_argument, http_resp_to_error_response}, api_get_options::GetObjectOptions, - transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient}, + transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info}, }; +use rustfs_utils::EMPTY_STRING_SHA256_HASH; #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] -struct Grantee { - id: String, - display_name: String, - uri: String, +pub struct Grantee { + pub id: String, + pub display_name: String, + pub uri: String, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] -struct Grant { - grantee: Grantee, - permission: String, +pub struct Grant { + pub grantee: Grantee, + pub permission: String, } #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] pub struct AccessControlList { - pub grant: Vec, + pub grant: Vec, pub permission: String, } #[derive(Debug, Default, serde::Deserialize)] pub struct AccessControlPolicy { #[serde(skip)] - owner: Owner, + owner: Owner, pub access_control_list: AccessControlList, } @@ -63,26 +63,23 @@ impl TransitionClient { let mut url_values = HashMap::new(); url_values.insert("acl".to_string(), "".to_string()); let mut resp = self - .execute_method( - http::Method::GET, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - query_values: url_values, - custom_header: HeaderMap::new(), - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::GET, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + custom_header: HeaderMap::new(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; if resp.status() != http::StatusCode::OK { @@ -98,7 +95,9 @@ impl TransitionClient { } }; - let mut obj_info = self.stat_object(bucket_name, object_name, &GetObjectOptions::default()).await?; + let mut obj_info = self + .stat_object(bucket_name, object_name, &GetObjectOptions::default()) + .await?; obj_info.owner.display_name = res.owner.display_name.clone(); obj_info.owner.id = res.owner.id.clone(); @@ -107,7 +106,9 @@ impl TransitionClient { let canned_acl = get_canned_acl(&res); if canned_acl != "" { - obj_info.metadata.insert("X-Amz-Acl", HeaderValue::from_str(&canned_acl).unwrap()); + obj_info + .metadata + .insert("X-Amz-Acl", HeaderValue::from_str(&canned_acl).unwrap()); return Ok(obj_info); } diff --git a/crates/ecstore/src/client/api_get_object_attributes.rs b/crates/ecstore/src/client/api_get_object_attributes.rs index 9204e7dd..15c25f34 100644 --- a/crates/ecstore/src/client/api_get_object_attributes.rs +++ b/crates/ecstore/src/client/api_get_object_attributes.rs @@ -20,41 +20,43 @@ use bytes::Bytes; use http::{HeaderMap, HeaderValue}; -use time::OffsetDateTime; -use std::io::Cursor; use std::collections::HashMap; +use std::io::Cursor; +use time::OffsetDateTime; use tokio::io::BufReader; -use s3s::{Body, dto::Owner}; -use s3s::header::{X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_DELETE_MARKER, X_AMZ_METADATA_DIRECTIVE, X_AMZ_VERSION_ID, - X_AMZ_REQUEST_CHARGED, X_AMZ_RESTORE, X_AMZ_PART_NUMBER_MARKER, X_AMZ_MAX_PARTS,}; -use rustfs_utils::EMPTY_STRING_SHA256_HASH; use crate::client::constants::{GET_OBJECT_ATTRIBUTES_MAX_PARTS, GET_OBJECT_ATTRIBUTES_TAGS, ISO8601_DATEFORMAT}; +use rustfs_utils::EMPTY_STRING_SHA256_HASH; +use s3s::header::{ + X_AMZ_DELETE_MARKER, X_AMZ_MAX_PARTS, X_AMZ_METADATA_DIRECTIVE, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, + X_AMZ_REQUEST_CHARGED, X_AMZ_RESTORE, X_AMZ_VERSION_ID, +}; +use s3s::{Body, dto::Owner}; use crate::client::{ api_error_response::err_invalid_argument, + api_get_object_acl::AccessControlPolicy, api_get_options::GetObjectOptions, transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info}, - api_get_object_acl::AccessControlPolicy, }; -struct ObjectAttributesOptions { - max_parts: i64, - version_id: String, - part_number_marker: i64, +pub struct ObjectAttributesOptions { + pub max_parts: i64, + pub version_id: String, + pub part_number_marker: i64, //server_side_encryption: encrypt::ServerSide, } -struct ObjectAttributes { - version_id: String, - last_modified: OffsetDateTime, - object_attributes_response: ObjectAttributesResponse, +pub struct ObjectAttributes { + pub version_id: String, + pub last_modified: OffsetDateTime, + pub object_attributes_response: ObjectAttributesResponse, } impl ObjectAttributes { fn new() -> Self { Self { - version_id: "".to_string(), + version_id: "".to_string(), last_modified: OffsetDateTime::now_utc(), object_attributes_response: ObjectAttributesResponse::new(), } @@ -63,81 +65,81 @@ impl ObjectAttributes { #[derive(Debug, Default, serde::Deserialize)] pub struct Checksum { - checksum_crc32: String, + checksum_crc32: String, checksum_crc32c: String, - checksum_sha1: String, + checksum_sha1: String, checksum_sha256: String, } impl Checksum { fn new() -> Self { Self { - checksum_crc32: "".to_string(), + checksum_crc32: "".to_string(), checksum_crc32c: "".to_string(), - checksum_sha1: "".to_string(), + checksum_sha1: "".to_string(), checksum_sha256: "".to_string(), } } } #[derive(Debug, Default, serde::Deserialize)] -struct ObjectParts { - parts_count: i64, - part_number_marker: i64, - next_part_number_marker: i64, - max_parts: i64, - is_truncated: bool, - parts: Vec, +pub struct ObjectParts { + pub parts_count: i64, + pub part_number_marker: i64, + pub next_part_number_marker: i64, + pub max_parts: i64, + is_truncated: bool, + parts: Vec, } impl ObjectParts { fn new() -> Self { Self { - parts_count: 0, - part_number_marker: 0, + parts_count: 0, + part_number_marker: 0, next_part_number_marker: 0, - max_parts: 0, - is_truncated: false, - parts: Vec::new(), + max_parts: 0, + is_truncated: false, + parts: Vec::new(), } } } #[derive(Debug, Default, serde::Deserialize)] -struct ObjectAttributesResponse { - etag: String, - storage_class: String, - object_size: i64, - checksum: Checksum, - object_parts: ObjectParts, +pub struct ObjectAttributesResponse { + pub etag: String, + pub storage_class: String, + pub object_size: i64, + pub checksum: Checksum, + pub object_parts: ObjectParts, } impl ObjectAttributesResponse { fn new() -> Self { Self { - etag: "".to_string(), + etag: "".to_string(), storage_class: "".to_string(), - object_size: 0, - checksum: Checksum::new(), - object_parts: ObjectParts::new(), + object_size: 0, + checksum: Checksum::new(), + object_parts: ObjectParts::new(), } } } #[derive(Debug, Default, serde::Deserialize)] struct ObjectAttributePart { - checksum_crc32: String, + checksum_crc32: String, checksum_crc32c: String, - checksum_sha1: String, + checksum_sha1: String, checksum_sha256: String, - part_number: i64, - size: i64, + part_number: i64, + size: i64, } impl ObjectAttributes { pub async fn parse_response(&mut self, resp: &mut http::Response) -> Result<(), std::io::Error> { let h = resp.headers(); - let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time + let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time self.last_modified = mod_time; self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string(); @@ -155,7 +157,12 @@ impl ObjectAttributes { } impl TransitionClient { - pub async fn get_object_attributes(&self, bucket_name: &str, object_name: &str, opts: ObjectAttributesOptions) -> Result { + pub async fn get_object_attributes( + &self, + bucket_name: &str, + object_name: &str, + opts: ObjectAttributesOptions, + ) -> Result { let mut url_values = HashMap::new(); url_values.insert("attributes".to_string(), "".to_string()); if opts.version_id != "" { @@ -166,13 +173,19 @@ impl TransitionClient { headers.insert(X_AMZ_OBJECT_ATTRIBUTES, HeaderValue::from_str(GET_OBJECT_ATTRIBUTES_TAGS).unwrap()); if opts.part_number_marker > 0 { - headers.insert(X_AMZ_PART_NUMBER_MARKER, HeaderValue::from_str(&opts.part_number_marker.to_string()).unwrap()); + headers.insert( + X_AMZ_PART_NUMBER_MARKER, + HeaderValue::from_str(&opts.part_number_marker.to_string()).unwrap(), + ); } if opts.max_parts > 0 { headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&opts.max_parts.to_string()).unwrap()); } else { - headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&GET_OBJECT_ATTRIBUTES_MAX_PARTS.to_string()).unwrap()); + headers.insert( + X_AMZ_MAX_PARTS, + HeaderValue::from_str(&GET_OBJECT_ATTRIBUTES_MAX_PARTS.to_string()).unwrap(), + ); } /*if opts.server_side_encryption.is_some() { @@ -180,32 +193,31 @@ impl TransitionClient { }*/ let mut resp = self - .execute_method( - http::Method::HEAD, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - query_values: url_values, - custom_header: headers, - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - content_md5_base64: "".to_string(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::HEAD, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + custom_header: headers, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_md5_base64: "".to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; let h = resp.headers(); let has_etag = h.get("ETag").unwrap().to_str().unwrap(); if !has_etag.is_empty() { - return Err(std::io::Error::other("get_object_attributes is not supported by the current endpoint version")); + return Err(std::io::Error::other( + "get_object_attributes is not supported by the current endpoint version", + )); } if resp.status() != http::StatusCode::OK { @@ -226,4 +238,4 @@ impl TransitionClient { Ok(oa) } -} \ No newline at end of file +} diff --git a/crates/ecstore/src/client/api_get_object_file.rs b/crates/ecstore/src/client/api_get_object_file.rs index 6bda2069..4c268497 100644 --- a/crates/ecstore/src/client/api_get_object_file.rs +++ b/crates/ecstore/src/client/api_get_object_file.rs @@ -21,15 +21,15 @@ use bytes::Bytes; use http::HeaderMap; use std::io::Cursor; -use tokio::io::BufReader; #[cfg(not(windows))] -use std::os::unix::fs::PermissionsExt; +use std::os::unix::fs::MetadataExt; #[cfg(not(windows))] use std::os::unix::fs::OpenOptionsExt; #[cfg(not(windows))] -use std::os::unix::fs::MetadataExt; +use std::os::unix::fs::PermissionsExt; #[cfg(windows)] use std::os::windows::fs::MetadataExt; +use tokio::io::BufReader; use crate::client::{ api_error_response::err_invalid_argument, @@ -38,14 +38,20 @@ use crate::client::{ }; impl TransitionClient { - pub async fn fget_object(&self, bucket_name: &str, object_name: &str, file_path: &str, opts: GetObjectOptions) -> Result<(), std::io::Error> { + pub async fn fget_object( + &self, + bucket_name: &str, + object_name: &str, + file_path: &str, + opts: GetObjectOptions, + ) -> Result<(), std::io::Error> { match std::fs::metadata(file_path) { Ok(file_path_stat) => { let ft = file_path_stat.file_type(); if ft.is_dir() { return Err(std::io::Error::other(err_invalid_argument("filename is a directory."))); } - }, + } Err(err) => { return Err(std::io::Error::other(err)); } @@ -77,7 +83,7 @@ impl TransitionClient { }; let mut file_part_path = file_path.to_string(); - file_part_path.push_str(""/*sum_sha256_hex(object_stat.etag.as_bytes())*/); + file_part_path.push_str("" /*sum_sha256_hex(object_stat.etag.as_bytes())*/); file_part_path.push_str(".part.rustfs"); #[cfg(not(windows))] @@ -138,4 +144,4 @@ impl TransitionClient { Ok(()) } -} \ No newline at end of file +} diff --git a/crates/ecstore/src/client/api_list.rs b/crates/ecstore/src/client/api_list.rs index f978f063..d2618c4a 100644 --- a/crates/ecstore/src/client/api_list.rs +++ b/crates/ecstore/src/client/api_list.rs @@ -76,26 +76,23 @@ impl TransitionClient { } let mut resp = self - .execute_method( - http::Method::GET, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: "".to_string(), - query_values: url_values, - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - custom_header: headers, - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::GET, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: "".to_string(), + query_values: url_values, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + custom_header: headers, + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; if resp.status() != StatusCode::OK { return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, ""))); diff --git a/crates/ecstore/src/client/api_remove.rs b/crates/ecstore/src/client/api_remove.rs index a6845229..14f6cd2a 100644 --- a/crates/ecstore/src/client/api_remove.rs +++ b/crates/ecstore/src/client/api_remove.rs @@ -78,26 +78,23 @@ impl TransitionClient { let headers = HeaderMap::new(); let resp = self - .execute_method( - Method::DELETE, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - custom_header: headers, - object_name: "".to_string(), - query_values: Default::default(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(Method::DELETE, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + custom_header: headers, + object_name: "".to_string(), + query_values: Default::default(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; { @@ -109,26 +106,23 @@ impl TransitionClient { pub async fn remove_bucket(&self, bucket_name: &str) -> Result<(), std::io::Error> { let resp = self - .execute_method( - http::Method::DELETE, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - custom_header: Default::default(), - object_name: "".to_string(), - query_values: Default::default(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::DELETE, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + custom_header: Default::default(), + object_name: "".to_string(), + query_values: Default::default(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; { @@ -163,26 +157,23 @@ impl TransitionClient { } let resp = self - .execute_method( - http::Method::DELETE, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - query_values: url_values, - custom_header: headers, - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::DELETE, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + query_values: url_values, + custom_header: headers, + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; Ok(RemoveObjectResult { @@ -277,15 +268,11 @@ impl TransitionClient { while let Some(object) = objects_rx.recv().await { if has_invalid_xml_char(&object.name) { let remove_result = self - .remove_object_inner( - bucket_name, - &object.name, - RemoveObjectOptions { - version_id: object.version_id.expect("err").to_string(), - governance_bypass: opts.governance_bypass, - ..Default::default() - }, - ) + .remove_object_inner(bucket_name, &object.name, RemoveObjectOptions { + version_id: object.version_id.expect("err").to_string(), + governance_bypass: opts.governance_bypass, + ..Default::default() + }) .await?; let remove_result_clone = remove_result.clone(); if !remove_result.err.is_none() { @@ -322,26 +309,23 @@ impl TransitionClient { let remove_bytes = generate_remove_multi_objects_request(&batch); let resp = self - .execute_method( - http::Method::POST, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - query_values: url_values.clone(), - content_body: ReaderImpl::Body(Bytes::from(remove_bytes.clone())), - content_length: remove_bytes.len() as i64, - content_md5_base64: base64_encode(&HashAlgorithm::Md5.hash_encode(&remove_bytes).as_ref()), - content_sha256_hex: base64_encode(&HashAlgorithm::SHA256.hash_encode(&remove_bytes).as_ref()), - custom_header: headers, - object_name: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::POST, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + query_values: url_values.clone(), + content_body: ReaderImpl::Body(Bytes::from(remove_bytes.clone())), + content_length: remove_bytes.len() as i64, + content_md5_base64: base64_encode(&HashAlgorithm::Md5.hash_encode(&remove_bytes).as_ref()), + content_sha256_hex: base64_encode(&HashAlgorithm::SHA256.hash_encode(&remove_bytes).as_ref()), + custom_header: headers, + object_name: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; let body_bytes: Vec = resp.body().bytes().expect("err").to_vec(); @@ -369,26 +353,23 @@ impl TransitionClient { url_values.insert("uploadId".to_string(), upload_id.to_string()); let resp = self - .execute_method( - http::Method::DELETE, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - query_values: url_values, - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - custom_header: HeaderMap::new(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - content_md5_base64: "".to_string(), - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::DELETE, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + custom_header: HeaderMap::new(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; //if resp.is_some() { if resp.status() != StatusCode::NO_CONTENT { diff --git a/crates/ecstore/src/client/api_restore.rs b/crates/ecstore/src/client/api_restore.rs index ad11fcf3..48999432 100644 --- a/crates/ecstore/src/client/api_restore.rs +++ b/crates/ecstore/src/client/api_restore.rs @@ -20,12 +20,15 @@ use bytes::Bytes; use http::HeaderMap; +use std::collections::HashMap; use std::io::Cursor; use tokio::io::BufReader; -use std::collections::HashMap; use crate::client::{ - api_error_response::{err_invalid_argument, http_resp_to_error_response}, api_get_object_acl::AccessControlList, api_get_options::GetObjectOptions, transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient} + api_error_response::{err_invalid_argument, http_resp_to_error_response}, + api_get_object_acl::AccessControlList, + api_get_options::GetObjectOptions, + transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info}, }; const TIER_STANDARD: &str = "Standard"; @@ -33,55 +36,55 @@ const TIER_BULK: &str = "Bulk"; const TIER_EXPEDITED: &str = "Expedited"; #[derive(Debug, Default, serde::Serialize)] -struct GlacierJobParameters { - tier: String, +pub struct GlacierJobParameters { + pub tier: String, } #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] -struct Encryption { - encryption_type: String, - kms_context: String, - kms_key_id: String, +pub struct Encryption { + pub encryption_type: String, + pub kms_context: String, + pub kms_key_id: String, } #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] -struct MetadataEntry { - name: String, - value: String, +pub struct MetadataEntry { + pub name: String, + pub value: String, } #[derive(Debug, Default, serde::Serialize)] -struct S3 { - access_control_list: AccessControlList, - bucket_name: String, - prefix: String, - canned_acl: String, - encryption: Encryption, - storage_class: String, +pub struct S3 { + pub access_control_list: AccessControlList, + pub bucket_name: String, + pub prefix: String, + pub canned_acl: String, + pub encryption: Encryption, + pub storage_class: String, //tagging: Tags, - user_metadata: MetadataEntry, + pub user_metadata: MetadataEntry, } #[derive(Debug, Default, serde::Serialize)] -struct SelectParameters { - expression_type: String, - expression: String, +pub struct SelectParameters { + pub expression_type: String, + pub expression: String, //input_serialization: SelectObjectInputSerialization, //output_serialization: SelectObjectOutputSerialization, } #[derive(Debug, Default, serde::Serialize)] -struct OutputLocation(S3); +pub struct OutputLocation(pub S3); #[derive(Debug, Default, serde::Serialize)] -struct RestoreRequest { - restore_type: String, - tier: String, - days: i64, - glacier_job_parameters: GlacierJobParameters, - description: String, - select_parameters: SelectParameters, - output_location: OutputLocation, +pub struct RestoreRequest { + pub restore_type: String, + pub tier: String, + pub days: i64, + pub glacier_job_parameters: GlacierJobParameters, + pub description: String, + pub select_parameters: SelectParameters, + pub output_location: OutputLocation, } impl RestoreRequest { @@ -115,7 +118,13 @@ impl RestoreRequest { } impl TransitionClient { - pub async fn restore_object(&self, bucket_name: &str, object_name: &str, version_id: &str, restore_req: &RestoreRequest) -> Result<(), std::io::Error> { + pub async fn restore_object( + &self, + bucket_name: &str, + object_name: &str, + version_id: &str, + restore_req: &RestoreRequest, + ) -> Result<(), std::io::Error> { let restore_request = match serde_xml_rs::to_string(restore_req) { Ok(buf) => buf, Err(e) => { @@ -132,26 +141,23 @@ impl TransitionClient { let restore_request_buffer = Bytes::from(restore_request_bytes.clone()); let resp = self - .execute_method( - http::Method::HEAD, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - query_values: url_values, - custom_header: HeaderMap::new(), - content_sha256_hex: "".to_string(), //sum_sha256_hex(&restore_request_bytes), - content_md5_base64: "".to_string(), //sum_md5_base64(&restore_request_bytes), - content_body: ReaderImpl::Body(restore_request_buffer), - content_length: restore_request_bytes.len() as i64, - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::HEAD, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + custom_header: HeaderMap::new(), + content_sha256_hex: "".to_string(), //sum_sha256_hex(&restore_request_bytes), + content_md5_base64: "".to_string(), //sum_md5_base64(&restore_request_bytes), + content_body: ReaderImpl::Body(restore_request_buffer), + content_length: restore_request_bytes.len() as i64, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await?; let b = resp.body().bytes().expect("err").to_vec(); @@ -160,4 +166,4 @@ impl TransitionClient { } Ok(()) } -} \ No newline at end of file +} diff --git a/crates/ecstore/src/client/api_stat.rs b/crates/ecstore/src/client/api_stat.rs index b9f209f3..8e0f62f5 100644 --- a/crates/ecstore/src/client/api_stat.rs +++ b/crates/ecstore/src/client/api_stat.rs @@ -21,40 +21,37 @@ use bytes::Bytes; use http::{HeaderMap, HeaderValue}; use rustfs_utils::EMPTY_STRING_SHA256_HASH; -use uuid::Uuid; use std::{collections::HashMap, str::FromStr}; use tokio::io::BufReader; +use uuid::Uuid; -use s3s::header::{X_AMZ_DELETE_MARKER, X_AMZ_VERSION_ID}; use crate::client::{ - api_error_response::{err_invalid_argument, http_resp_to_error_response, ErrorResponse}, + api_error_response::{ErrorResponse, err_invalid_argument, http_resp_to_error_response}, api_get_options::GetObjectOptions, - transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient}, + transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info}, }; +use s3s::header::{X_AMZ_DELETE_MARKER, X_AMZ_VERSION_ID}; impl TransitionClient { pub async fn bucket_exists(&self, bucket_name: &str) -> Result { let resp = self - .execute_method( - http::Method::HEAD, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: "".to_string(), - query_values: HashMap::new(), - custom_header: HeaderMap::new(), - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - content_md5_base64: "".to_string(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::HEAD, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: "".to_string(), + query_values: HashMap::new(), + custom_header: HeaderMap::new(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_md5_base64: "".to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await; if let Ok(resp) = resp { @@ -70,7 +67,12 @@ impl TransitionClient { Ok(true) } - pub async fn stat_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result { + pub async fn stat_object( + &self, + bucket_name: &str, + object_name: &str, + opts: &GetObjectOptions, + ) -> Result { let mut headers = opts.header(); if opts.internal.replication_delete_marker { headers.insert("X-Source-DeleteMarker", HeaderValue::from_str("true").unwrap()); @@ -80,26 +82,23 @@ impl TransitionClient { } let resp = self - .execute_method( - http::Method::HEAD, - &mut RequestMetadata { - bucket_name: bucket_name.to_string(), - object_name: object_name.to_string(), - query_values: opts.to_query_values(), - custom_header: headers, - content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), - content_md5_base64: "".to_string(), - content_body: ReaderImpl::Body(Bytes::new()), - content_length: 0, - stream_sha256: false, - trailer: HeaderMap::new(), - pre_sign_url: Default::default(), - add_crc: Default::default(), - extra_pre_sign_header: Default::default(), - bucket_location: Default::default(), - expires: Default::default(), - }, - ) + .execute_method(http::Method::HEAD, &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: opts.to_query_values(), + custom_header: headers, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_md5_base64: "".to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }) .await; match resp { @@ -107,24 +106,30 @@ impl TransitionClient { let h = resp.headers(); let delete_marker = if let Some(x_amz_delete_marker) = h.get(X_AMZ_DELETE_MARKER.as_str()) { x_amz_delete_marker.to_str().unwrap() == "true" - } else { false }; + } else { + false + }; let replication_ready = if let Some(x_amz_delete_marker) = h.get("X-Replication-Ready") { x_amz_delete_marker.to_str().unwrap() == "true" - } else { false }; + } else { + false + }; if resp.status() != http::StatusCode::OK && resp.status() != http::StatusCode::PARTIAL_CONTENT { if resp.status() == http::StatusCode::METHOD_NOT_ALLOWED && opts.version_id != "" && delete_marker { let err_resp = ErrorResponse { status_code: resp.status(), - code: s3s::S3ErrorCode::MethodNotAllowed, - message: "the specified method is not allowed against this resource.".to_string(), + code: s3s::S3ErrorCode::MethodNotAllowed, + message: "the specified method is not allowed against this resource.".to_string(), bucket_name: bucket_name.to_string(), - key: object_name.to_string(), + key: object_name.to_string(), ..Default::default() }; return Ok(ObjectInfo { - version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) { + version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) { Ok(v) => v, - Err(e) => { return Err(std::io::Error::other(e)); } + Err(e) => { + return Err(std::io::Error::other(e)); + } }, is_delete_marker: delete_marker, ..Default::default() @@ -132,12 +137,14 @@ impl TransitionClient { //err_resp } return Ok(ObjectInfo { - version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) { + version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) { Ok(v) => v, - Err(e) => { return Err(std::io::Error::other(e)); } + Err(e) => { + return Err(std::io::Error::other(e)); + } }, - is_delete_marker: delete_marker, - replication_ready: replication_ready, + is_delete_marker: delete_marker, + replication_ready: replication_ready, ..Default::default() }); //http_resp_to_error_response(resp, bucket_name, object_name) diff --git a/crates/ecstore/src/client/bucket_cache.rs b/crates/ecstore/src/client/bucket_cache.rs index 4e6f5074..b26aedb9 100644 --- a/crates/ecstore/src/client/bucket_cache.rs +++ b/crates/ecstore/src/client/bucket_cache.rs @@ -167,8 +167,7 @@ impl TransitionClient { content_sha256 = UNSIGNED_PAYLOAD.to_string(); } - req - .headers_mut() + req.headers_mut() .insert("X-Amz-Content-Sha256", content_sha256.parse().unwrap()); let req = rustfs_signer::sign_v4(req, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1"); Ok(req) diff --git a/crates/ecstore/src/client/mod.rs b/crates/ecstore/src/client/mod.rs index 7cc781a6..9a87b475 100644 --- a/crates/ecstore/src/client/mod.rs +++ b/crates/ecstore/src/client/mod.rs @@ -16,6 +16,9 @@ pub mod admin_handler_utils; pub mod api_bucket_policy; pub mod api_error_response; pub mod api_get_object; +pub mod api_get_object_acl; +pub mod api_get_object_attributes; +pub mod api_get_object_file; pub mod api_get_options; pub mod api_list; pub mod api_put_object; @@ -24,11 +27,8 @@ pub mod api_put_object_multipart; pub mod api_put_object_streaming; pub mod api_remove; pub mod api_restore; -pub mod api_stat; -pub mod api_get_object_acl; -pub mod api_get_object_attributes; -pub mod api_get_object_file; pub mod api_s3_datatypes; +pub mod api_stat; pub mod bucket_cache; pub mod constants; pub mod credentials; diff --git a/crates/ecstore/src/client/object_handlers_common.rs b/crates/ecstore/src/client/object_handlers_common.rs index 6c380aab..75894e43 100644 --- a/crates/ecstore/src/client/object_handlers_common.rs +++ b/crates/ecstore/src/client/object_handlers_common.rs @@ -31,14 +31,10 @@ pub async fn delete_object_versions(api: ECStore, bucket: &str, to_del: &[Object remaining = &[]; } let vc = BucketVersioningSys::get(bucket).await.expect("err!"); - let _deleted_objs = api.delete_objects( - bucket, - to_del.to_vec(), - ObjectOptions { - //prefix_enabled_fn: vc.prefix_enabled(""), - version_suspended: vc.suspended(), - ..Default::default() - }, - ); + let _deleted_objs = api.delete_objects(bucket, to_del.to_vec(), ObjectOptions { + //prefix_enabled_fn: vc.prefix_enabled(""), + version_suspended: vc.suspended(), + ..Default::default() + }); } } diff --git a/crates/ecstore/src/client/transition_api.rs b/crates/ecstore/src/client/transition_api.rs index 512ec995..a91a7a7d 100644 --- a/crates/ecstore/src/client/transition_api.rs +++ b/crates/ecstore/src/client/transition_api.rs @@ -387,7 +387,11 @@ impl TransitionClient { &metadata.query_values, )?; - let Ok(mut req) = Request::builder().method(method).uri(target_url.to_string()).body(Body::empty()) else { + let Ok(mut req) = Request::builder() + .method(method) + .uri(target_url.to_string()) + .body(Body::empty()) + else { return Err(std::io::Error::other("create request error")); }; @@ -428,13 +432,7 @@ impl TransitionClient { } } if signer_type == SignatureType::SignatureV2 { - req = rustfs_signer::pre_sign_v2( - req, - &access_key_id, - &secret_access_key, - metadata.expires, - is_virtual_host, - ); + req = rustfs_signer::pre_sign_v2(req, &access_key_id, &secret_access_key, metadata.expires, is_virtual_host); } else if signer_type == SignatureType::SignatureV4 { req = rustfs_signer::pre_sign_v4( req, @@ -458,9 +456,7 @@ impl TransitionClient { //req.content_length = metadata.content_length; if metadata.content_length <= -1 { let chunked_value = HeaderValue::from_str(&vec!["chunked"].join(",")).expect("err"); - req - .headers_mut() - .insert(http::header::TRANSFER_ENCODING, chunked_value); + req.headers_mut().insert(http::header::TRANSFER_ENCODING, chunked_value); } if metadata.content_md5_base64.len() > 0 { @@ -473,8 +469,7 @@ impl TransitionClient { } if signer_type == SignatureType::SignatureV2 { - req = - rustfs_signer::sign_v2(req, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host); + req = rustfs_signer::sign_v2(req, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host); } else if metadata.stream_sha256 && !self.secure { if metadata.trailer.len() > 0 { for (_, v) in &metadata.trailer { @@ -491,8 +486,8 @@ impl TransitionClient { } else if metadata.trailer.len() > 0 { sha_header = UNSIGNED_PAYLOAD_TRAILER.to_string(); } - req - .headers_mut().insert("X-Amz-Content-Sha256".parse::().unwrap(), sha_header.parse().expect("err")); + req.headers_mut() + .insert("X-Amz-Content-Sha256".parse::().unwrap(), sha_header.parse().expect("err")); req = rustfs_signer::sign_v4_trailer( req, @@ -516,7 +511,7 @@ impl TransitionClient { } Ok(req) - } + } pub fn set_user_agent(&self, req: &mut Request) { let headers = req.headers_mut(); diff --git a/crates/ecstore/src/config/com.rs b/crates/ecstore/src/config/com.rs index 11b45cbc..b73d6e76 100644 --- a/crates/ecstore/src/config/com.rs +++ b/crates/ecstore/src/config/com.rs @@ -70,29 +70,20 @@ pub async fn read_config_with_metadata( } pub async fn save_config(api: Arc, file: &str, data: Vec) -> Result<()> { - save_config_with_opts( - api, - file, - data, - &ObjectOptions { - max_parity: true, - ..Default::default() - }, - ) + save_config_with_opts(api, file, data, &ObjectOptions { + max_parity: true, + ..Default::default() + }) .await } pub async fn delete_config(api: Arc, file: &str) -> Result<()> { match api - .delete_object( - RUSTFS_META_BUCKET, - file, - ObjectOptions { - delete_prefix: true, - delete_prefix_object: true, - ..Default::default() - }, - ) + .delete_object(RUSTFS_META_BUCKET, file, ObjectOptions { + delete_prefix: true, + delete_prefix_object: true, + ..Default::default() + }) .await { Ok(_) => Ok(()), diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index acb3b97a..a320af41 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -2026,15 +2026,11 @@ impl DiskAPI for LocalDisk { ) -> Result<()> { if path.starts_with(SLASH_SEPARATOR) { return self - .delete( - volume, - path, - DeleteOptions { - recursive: false, - immediate: false, - ..Default::default() - }, - ) + .delete(volume, path, DeleteOptions { + recursive: false, + immediate: false, + ..Default::default() + }) .await; } diff --git a/crates/ecstore/src/erasure_coding/bitrot.rs b/crates/ecstore/src/erasure_coding/bitrot.rs index 587b127c..cc468c3d 100644 --- a/crates/ecstore/src/erasure_coding/bitrot.rs +++ b/crates/ecstore/src/erasure_coding/bitrot.rs @@ -317,13 +317,10 @@ enum WriterType { impl std::fmt::Debug for BitrotWriterWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BitrotWriterWrapper") - .field( - "writer_type", - &match self.writer_type { - WriterType::InlineBuffer => "InlineBuffer", - WriterType::Other => "Other", - }, - ) + .field("writer_type", &match self.writer_type { + WriterType::InlineBuffer => "InlineBuffer", + WriterType::Other => "Other", + }) .finish() } } diff --git a/crates/ecstore/src/heal/data_usage.rs b/crates/ecstore/src/heal/data_usage.rs index a6d56997..caf50547 100644 --- a/crates/ecstore/src/heal/data_usage.rs +++ b/crates/ecstore/src/heal/data_usage.rs @@ -174,13 +174,10 @@ pub async fn load_data_usage_from_backend(store: Arc) -> Result) -> Result) { for (tier, st) in &self.tiers { - stats.insert( - tier.clone(), - TierStats { - total_size: st.total_size, - num_versions: st.num_versions, - num_objects: st.num_objects, - }, - ); + stats.insert(tier.clone(), TierStats { + total_size: st.total_size, + num_versions: st.num_versions, + num_objects: st.num_objects, + }); } } } @@ -446,16 +443,10 @@ impl DataUsageCache { let path = Path::new(BUCKET_META_PREFIX).join(name); // warn!("Loading data usage cache from backend: {}", path.display()); match store - .get_object_reader( - RUSTFS_META_BUCKET, - path.to_str().unwrap(), - None, - HeaderMap::new(), - &ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) + .get_object_reader(RUSTFS_META_BUCKET, path.to_str().unwrap(), None, HeaderMap::new(), &ObjectOptions { + no_lock: true, + ..Default::default() + }) .await { Ok(mut reader) => { @@ -469,16 +460,10 @@ impl DataUsageCache { match err { Error::FileNotFound | Error::VolumeNotFound => { match store - .get_object_reader( - RUSTFS_META_BUCKET, - name, - None, - HeaderMap::new(), - &ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) + .get_object_reader(RUSTFS_META_BUCKET, name, None, HeaderMap::new(), &ObjectOptions { + no_lock: true, + ..Default::default() + }) .await { Ok(mut reader) => { @@ -819,18 +804,15 @@ impl DataUsageCache { bui.replica_count = rs.replica_count; for (arn, stat) in rs.targets.iter() { - bui.replication_info.insert( - arn.clone(), - BucketTargetUsageInfo { - replication_pending_size: stat.pending_size, - replicated_size: stat.replicated_size, - replication_failed_size: stat.failed_size, - replication_pending_count: stat.pending_count, - replication_failed_count: stat.failed_count, - replicated_count: stat.replicated_count, - ..Default::default() - }, - ); + bui.replication_info.insert(arn.clone(), BucketTargetUsageInfo { + replication_pending_size: stat.pending_size, + replicated_size: stat.replicated_size, + replication_failed_size: stat.failed_size, + replication_pending_count: stat.pending_count, + replication_failed_count: stat.failed_count, + replicated_count: stat.replicated_count, + ..Default::default() + }); } } dst.insert(bucket.name.clone(), bui); diff --git a/crates/ecstore/src/heal/heal_commands.rs b/crates/ecstore/src/heal/heal_commands.rs index 3df0f3ba..62bb6a08 100644 --- a/crates/ecstore/src/heal/heal_commands.rs +++ b/crates/ecstore/src/heal/heal_commands.rs @@ -255,15 +255,11 @@ impl HealingTracker { pub async fn delete(&self) -> Result<()> { if let Some(disk) = &self.disk { let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME); - disk.delete( - RUSTFS_META_BUCKET, - file_path.to_str().unwrap(), - DeleteOptions { - recursive: false, - immediate: false, - ..Default::default() - }, - ) + disk.delete(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), DeleteOptions { + recursive: false, + immediate: false, + ..Default::default() + }) .await?; } diff --git a/crates/ecstore/src/metrics_realtime.rs b/crates/ecstore/src/metrics_realtime.rs index 298ff846..0fcde38f 100644 --- a/crates/ecstore/src/metrics_realtime.rs +++ b/crates/ecstore/src/metrics_realtime.rs @@ -148,14 +148,11 @@ async fn collect_local_disks_metrics(disks: &HashSet) -> HashMap res, @@ -1239,17 +1224,10 @@ impl ECStore { let mut data = PutObjReader::from_vec(chunk); let pi = match self - .put_object_part( - &bucket, - &object_info.name, - &res.upload_id, - part.number, - &mut data, - &ObjectOptions { - preserve_etag: Some(part.etag.clone()), - ..Default::default() - }, - ) + .put_object_part(&bucket, &object_info.name, &res.upload_id, part.number, &mut data, &ObjectOptions { + preserve_etag: Some(part.etag.clone()), + ..Default::default() + }) .await { Ok(pi) => pi, @@ -1269,17 +1247,11 @@ impl ECStore { if let Err(err) = self .clone() - .complete_multipart_upload( - &bucket, - &object_info.name, - &res.upload_id, - parts, - &ObjectOptions { - data_movement: true, - mod_time: object_info.mod_time, - ..Default::default() - }, - ) + .complete_multipart_upload(&bucket, &object_info.name, &res.upload_id, parts, &ObjectOptions { + data_movement: true, + mod_time: object_info.mod_time, + ..Default::default() + }) .await { error!("decommission_object: complete_multipart_upload err {:?}", &err); @@ -1295,21 +1267,16 @@ impl ECStore { let mut data = PutObjReader::new(hrd); if let Err(err) = self - .put_object( - &bucket, - &object_info.name, - &mut data, - &ObjectOptions { - src_pool_idx: pool_idx, - data_movement: true, - version_id: object_info.version_id.as_ref().map(|v| v.to_string()), - mod_time: object_info.mod_time, - user_defined: object_info.user_defined.clone(), - preserve_etag: object_info.etag.clone(), + .put_object(&bucket, &object_info.name, &mut data, &ObjectOptions { + src_pool_idx: pool_idx, + data_movement: true, + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + mod_time: object_info.mod_time, + user_defined: object_info.user_defined.clone(), + preserve_etag: object_info.etag.clone(), - ..Default::default() - }, - ) + ..Default::default() + }) .await { error!("decommission_object: put_object err {:?}", &err); @@ -1349,34 +1316,31 @@ impl SetDisks { let cb1 = cb_func.clone(); - list_path_raw( - rx, - ListPathRawOptions { - disks: disks.iter().cloned().map(Some).collect(), - bucket: bucket_info.name.clone(), - path: bucket_info.prefix.clone(), - recursice: true, - min_disks: listing_quorum, - agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - let resolver = resolver.clone(); - let cb_func = cb_func.clone(); - match entries.resolve(resolver) { - Some(entry) => { - warn!("decommission_pool: list_objects_to_decommission get {}", &entry.name); - Box::pin(async move { - cb_func(entry).await; - }) - } - None => { - warn!("decommission_pool: list_objects_to_decommission get none"); - Box::pin(async {}) - } + list_path_raw(rx, ListPathRawOptions { + disks: disks.iter().cloned().map(Some).collect(), + bucket: bucket_info.name.clone(), + path: bucket_info.prefix.clone(), + recursice: true, + min_disks: listing_quorum, + agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + let resolver = resolver.clone(); + let cb_func = cb_func.clone(); + match entries.resolve(resolver) { + Some(entry) => { + warn!("decommission_pool: list_objects_to_decommission get {}", &entry.name); + Box::pin(async move { + cb_func(entry).await; + }) } - })), - ..Default::default() - }, - ) + None => { + warn!("decommission_pool: list_objects_to_decommission get none"); + Box::pin(async {}) + } + } + })), + ..Default::default() + }) .await?; Ok(()) diff --git a/crates/ecstore/src/rebalance.rs b/crates/ecstore/src/rebalance.rs index 5ab2f149..ecad7bfd 100644 --- a/crates/ecstore/src/rebalance.rs +++ b/crates/ecstore/src/rebalance.rs @@ -774,20 +774,16 @@ impl ECStore { let mut error = None; if version.deleted { if let Err(err) = set - .delete_object( - &bucket, - &version.name, - ObjectOptions { - versioned: true, - version_id: version_id.clone(), - mod_time: version.mod_time, - src_pool_idx: pool_index, - data_movement: true, - delete_marker: true, - skip_decommissioned: true, - ..Default::default() - }, - ) + .delete_object(&bucket, &version.name, ObjectOptions { + versioned: true, + version_id: version_id.clone(), + mod_time: version.mod_time, + src_pool_idx: pool_index, + data_movement: true, + delete_marker: true, + skip_decommissioned: true, + ..Default::default() + }) .await { if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { @@ -883,16 +879,12 @@ impl ECStore { if rebalanced == fivs.versions.len() { if let Err(err) = set - .delete_object( - bucket.as_str(), - &encode_dir_object(&entry.name), - ObjectOptions { - delete_prefix: true, - delete_prefix_object: true, + .delete_object(bucket.as_str(), &encode_dir_object(&entry.name), ObjectOptions { + delete_prefix: true, + delete_prefix_object: true, - ..Default::default() - }, - ) + ..Default::default() + }) .await { error!("rebalance_entry: delete_object err {:?}", &err); @@ -911,17 +903,13 @@ impl ECStore { if object_info.is_multipart() { let res = match self - .new_multipart_upload( - &bucket, - &object_info.name, - &ObjectOptions { - version_id: object_info.version_id.as_ref().map(|v| v.to_string()), - user_defined: object_info.user_defined.clone(), - src_pool_idx: pool_idx, - data_movement: true, - ..Default::default() - }, - ) + .new_multipart_upload(&bucket, &object_info.name, &ObjectOptions { + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + user_defined: object_info.user_defined.clone(), + src_pool_idx: pool_idx, + data_movement: true, + ..Default::default() + }) .await { Ok(res) => res, @@ -955,17 +943,10 @@ impl ECStore { let mut data = PutObjReader::from_vec(chunk); let pi = match self - .put_object_part( - &bucket, - &object_info.name, - &res.upload_id, - part.number, - &mut data, - &ObjectOptions { - preserve_etag: Some(part.etag.clone()), - ..Default::default() - }, - ) + .put_object_part(&bucket, &object_info.name, &res.upload_id, part.number, &mut data, &ObjectOptions { + preserve_etag: Some(part.etag.clone()), + ..Default::default() + }) .await { Ok(pi) => pi, @@ -983,17 +964,11 @@ impl ECStore { if let Err(err) = self .clone() - .complete_multipart_upload( - &bucket, - &object_info.name, - &res.upload_id, - parts, - &ObjectOptions { - data_movement: true, - mod_time: object_info.mod_time, - ..Default::default() - }, - ) + .complete_multipart_upload(&bucket, &object_info.name, &res.upload_id, parts, &ObjectOptions { + data_movement: true, + mod_time: object_info.mod_time, + ..Default::default() + }) .await { error!("rebalance_object: complete_multipart_upload err {:?}", &err); @@ -1008,21 +983,16 @@ impl ECStore { let mut data = PutObjReader::new(hrd); if let Err(err) = self - .put_object( - &bucket, - &object_info.name, - &mut data, - &ObjectOptions { - src_pool_idx: pool_idx, - data_movement: true, - version_id: object_info.version_id.as_ref().map(|v| v.to_string()), - mod_time: object_info.mod_time, - user_defined: object_info.user_defined.clone(), - preserve_etag: object_info.etag.clone(), + .put_object(&bucket, &object_info.name, &mut data, &ObjectOptions { + src_pool_idx: pool_idx, + data_movement: true, + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + mod_time: object_info.mod_time, + user_defined: object_info.user_defined.clone(), + preserve_etag: object_info.etag.clone(), - ..Default::default() - }, - ) + ..Default::default() + }) .await { error!("rebalance_object: put_object err {:?}", &err); @@ -1167,36 +1137,33 @@ impl SetDisks { }; let cb1 = cb.clone(); - list_path_raw( - rx, - ListPathRawOptions { - disks: disks.iter().cloned().map(Some).collect(), - bucket: bucket.clone(), - recursice: true, - min_disks: listing_quorum, - agreed: Some(Box::new(move |entry: MetaCacheEntry| { - info!("list_objects_to_rebalance: agreed: {:?}", &entry.name); - Box::pin(cb1(entry)) - })), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - // let cb = cb.clone(); - let resolver = resolver.clone(); - let cb = cb.clone(); + list_path_raw(rx, ListPathRawOptions { + disks: disks.iter().cloned().map(Some).collect(), + bucket: bucket.clone(), + recursice: true, + min_disks: listing_quorum, + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + info!("list_objects_to_rebalance: agreed: {:?}", &entry.name); + Box::pin(cb1(entry)) + })), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + // let cb = cb.clone(); + let resolver = resolver.clone(); + let cb = cb.clone(); - match entries.resolve(resolver) { - Some(entry) => { - info!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name); - Box::pin(async move { cb(entry).await }) - } - None => { - info!("list_objects_to_rebalance: list_objects_to_decommission get none"); - Box::pin(async {}) - } + match entries.resolve(resolver) { + Some(entry) => { + info!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name); + Box::pin(async move { cb(entry).await }) } - })), - ..Default::default() - }, - ) + None => { + info!("list_objects_to_rebalance: list_objects_to_decommission get none"); + Box::pin(async {}) + } + } + })), + ..Default::default() + }) .await?; info!("list_objects_to_rebalance: list_objects_to_rebalance done"); diff --git a/crates/ecstore/src/rpc/tonic_service.rs b/crates/ecstore/src/rpc/tonic_service.rs index faaa3726..21ec9eb3 100644 --- a/crates/ecstore/src/rpc/tonic_service.rs +++ b/crates/ecstore/src/rpc/tonic_service.rs @@ -262,13 +262,10 @@ impl Node for NodeService { let request = request.into_inner(); match self .local_peer - .delete_bucket( - &request.bucket, - &DeleteBucketOptions { - force: false, - ..Default::default() - }, - ) + .delete_bucket(&request.bucket, &DeleteBucketOptions { + force: false, + ..Default::default() + }) .await { Ok(_) => Ok(tonic::Response::new(DeleteBucketResponse { diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index a0b61907..91b2b539 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -406,17 +406,11 @@ impl SetDisks { let src_object = src_object.clone(); futures.push(tokio::spawn(async move { let _ = disk - .delete_version( - &src_bucket, - &src_object, - fi, - false, - DeleteOptions { - undo_write: true, - old_data_dir, - ..Default::default() - }, - ) + .delete_version(&src_bucket, &src_object, fi, false, DeleteOptions { + undo_write: true, + old_data_dir, + ..Default::default() + }) .await .map_err(|e| { debug!("rename_data delete_version err {:?}", e); @@ -489,14 +483,10 @@ impl SetDisks { tokio::spawn(async move { if let Some(disk) = disk { (disk - .delete( - &bucket, - &file_path, - DeleteOptions { - recursive: true, - ..Default::default() - }, - ) + .delete(&bucket, &file_path, DeleteOptions { + recursive: true, + ..Default::default() + }) .await) .err() } else { @@ -695,14 +685,10 @@ impl SetDisks { if let Some(disk) = disks[i].as_ref() { let _ = disk - .delete( - bucket, - &path_join_buf(&[prefix, STORAGE_FORMAT_FILE]), - DeleteOptions { - recursive: true, - ..Default::default() - }, - ) + .delete(bucket, &path_join_buf(&[prefix, STORAGE_FORMAT_FILE]), DeleteOptions { + recursive: true, + ..Default::default() + }) .await .map_err(|e| { warn!("write meta revert err {:?}", e); @@ -1681,14 +1667,10 @@ impl SetDisks { for disk in disks.iter() { futures.push(async move { if let Some(disk) = disk { - disk.delete( - bucket, - prefix, - DeleteOptions { - recursive: true, - ..Default::default() - }, - ) + disk.delete(bucket, prefix, DeleteOptions { + recursive: true, + ..Default::default() + }) .await } else { Err(DiskError::DiskNotFound) @@ -2409,17 +2391,10 @@ impl SetDisks { // Allow for dangling deletes, on versions that have DataDir missing etc. // this would end up restoring the correct readable versions. return match self - .delete_if_dang_ling( - bucket, - object, - &parts_metadata, - &errs, - &data_errs_by_part, - ObjectOptions { - version_id: version_id_op.clone(), - ..Default::default() - }, - ) + .delete_if_dang_ling(bucket, object, &parts_metadata, &errs, &data_errs_by_part, ObjectOptions { + version_id: version_id_op.clone(), + ..Default::default() + }) .await { Ok(m) => { @@ -2725,15 +2700,11 @@ impl SetDisks { if parts_metadata[index].is_remote() { let rm_data_dir = parts_metadata[index].data_dir.unwrap().to_string(); let d_path = Path::new(&encode_dir_object(object)).join(rm_data_dir); - disk.delete( - bucket, - d_path.to_str().unwrap(), - DeleteOptions { - immediate: true, - recursive: true, - ..Default::default() - }, - ) + disk.delete(bucket, d_path.to_str().unwrap(), DeleteOptions { + immediate: true, + recursive: true, + ..Default::default() + }) .await?; } @@ -2756,17 +2727,10 @@ impl SetDisks { Err(err) => { let data_errs_by_part = HashMap::new(); match self - .delete_if_dang_ling( - bucket, - object, - &parts_metadata, - &errs, - &data_errs_by_part, - ObjectOptions { - version_id: version_id_op.clone(), - ..Default::default() - }, - ) + .delete_if_dang_ling(bucket, object, &parts_metadata, &errs, &data_errs_by_part, ObjectOptions { + version_id: version_id_op.clone(), + ..Default::default() + }) .await { Ok(m) => { @@ -2822,15 +2786,11 @@ impl SetDisks { let object = object.to_string(); futures.push(tokio::spawn(async move { let _ = disk - .delete( - &bucket, - &object, - DeleteOptions { - recursive: false, - immediate: false, - ..Default::default() - }, - ) + .delete(&bucket, &object, DeleteOptions { + recursive: false, + immediate: false, + ..Default::default() + }) .await; })); } @@ -3525,16 +3485,11 @@ impl SetDisks { Ok(fivs) => fivs, Err(err) => { match self_clone - .heal_object( - &bucket, - &encoded_entry_name, - "", - &HealOpts { - scan_mode, - remove: HEAL_DELETE_DANGLING, - ..Default::default() - }, - ) + .heal_object(&bucket, &encoded_entry_name, "", &HealOpts { + scan_mode, + remove: HEAL_DELETE_DANGLING, + ..Default::default() + }) .await { Ok((res, None)) => { @@ -3651,56 +3606,53 @@ impl SetDisks { let bucket_partial = bucket.clone(); let heal_entry_agree = heal_entry.clone(); let heal_entry_partial = heal_entry.clone(); - if let Err(err) = list_path_raw( - rx, - ListPathRawOptions { - disks, - fallback_disks, - bucket: bucket.clone(), - recursice: true, - forward_to, - min_disks: 1, - report_not_found: false, - agreed: Some(Box::new(move |entry: MetaCacheEntry| { - let jt = jt_agree.clone(); - let bucket = bucket_agree.clone(); - let heal_entry = heal_entry_agree.clone(); - Box::pin(async move { + if let Err(err) = list_path_raw(rx, ListPathRawOptions { + disks, + fallback_disks, + bucket: bucket.clone(), + recursice: true, + forward_to, + min_disks: 1, + report_not_found: false, + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + let jt = jt_agree.clone(); + let bucket = bucket_agree.clone(); + let heal_entry = heal_entry_agree.clone(); + Box::pin(async move { + jt.take().await; + let bucket = bucket.clone(); + tokio::spawn(async move { + heal_entry(bucket, entry).await; + }); + }) + })), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + let jt = jt_partial.clone(); + let bucket = bucket_partial.clone(); + let heal_entry = heal_entry_partial.clone(); + Box::pin({ + let heal_entry = heal_entry.clone(); + let resolver = resolver.clone(); + async move { + let entry = if let Some(entry) = entries.resolve(resolver) { + entry + } else if let (Some(entry), _) = entries.first_found() { + entry + } else { + return; + }; jt.take().await; let bucket = bucket.clone(); + let heal_entry = heal_entry.clone(); tokio::spawn(async move { heal_entry(bucket, entry).await; }); - }) - })), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - let jt = jt_partial.clone(); - let bucket = bucket_partial.clone(); - let heal_entry = heal_entry_partial.clone(); - Box::pin({ - let heal_entry = heal_entry.clone(); - let resolver = resolver.clone(); - async move { - let entry = if let Some(entry) = entries.resolve(resolver) { - entry - } else if let (Some(entry), _) = entries.first_found() { - entry - } else { - return; - }; - jt.take().await; - let bucket = bucket.clone(); - let heal_entry = heal_entry.clone(); - tokio::spawn(async move { - heal_entry(bucket, entry).await; - }); - } - }) - })), - finished: None, - ..Default::default() - }, - ) + } + }) + })), + finished: None, + ..Default::default() + }) .await { ret_err = Some(err.into()); @@ -3743,15 +3695,11 @@ impl SetDisks { let prefix = prefix.to_string(); futures.push(async move { if let Some(disk) = disk_op { - disk.delete( - &bucket, - &prefix, - DeleteOptions { - recursive: true, - immediate: true, - ..Default::default() - }, - ) + disk.delete(&bucket, &prefix, DeleteOptions { + recursive: true, + immediate: true, + ..Default::default() + }) .await } else { Ok(()) diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 28109afc..da86a830 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -557,14 +557,11 @@ impl StorageAPI for Sets { let idx = self.get_hashed_set_index(obj.object_name.as_str()); if !set_obj_map.contains_key(&idx) { - set_obj_map.insert( - idx, - vec![DelObj { - // set_idx: idx, - orig_idx: i, - obj: obj.clone(), - }], - ); + set_obj_map.insert(idx, vec![DelObj { + // set_idx: idx, + orig_idx: i, + obj: obj.clone(), + }]); } else if let Some(val) = set_obj_map.get_mut(&idx) { val.push(DelObj { // set_idx: idx, @@ -756,13 +753,10 @@ impl StorageAPI for Sets { #[tracing::instrument(skip(self))] async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)> { - let (disks, _) = init_storage_disks_with_errors( - &self.endpoints.endpoints, - &DiskOption { - cleanup: false, - health_check: false, - }, - ) + let (disks, _) = init_storage_disks_with_errors(&self.endpoints.endpoints, &DiskOption { + cleanup: false, + health_check: false, + }) .await; let (formats, errs) = load_format_erasure_all(&disks, true).await; if let Err(err) = check_format_erasure_values(&formats, self.set_drive_count) { diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index bff22f7d..5e69333d 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -154,13 +154,10 @@ impl ECStore { // validate_parity(partiy_count, pool_eps.drives_per_set)?; - let (disks, errs) = store_init::init_disks( - &pool_eps.endpoints, - &DiskOption { - cleanup: true, - health_check: true, - }, - ) + let (disks, errs) = store_init::init_disks(&pool_eps.endpoints, &DiskOption { + cleanup: true, + health_check: true, + }) .await; check_disk_fatal_errs(&errs)?; @@ -504,14 +501,10 @@ impl ECStore { } async fn delete_prefix(&self, bucket: &str, object: &str) -> Result<()> { for pool in self.pools.iter() { - pool.delete_object( - bucket, - object, - ObjectOptions { - delete_prefix: true, - ..Default::default() - }, - ) + pool.delete_object(bucket, object, ObjectOptions { + delete_prefix: true, + ..Default::default() + }) .await?; } @@ -621,15 +614,11 @@ impl ECStore { async fn get_pool_idx(&self, bucket: &str, object: &str, size: i64) -> Result { let idx = match self - .get_pool_idx_existing_with_opts( - bucket, - object, - &ObjectOptions { - skip_decommissioned: true, - skip_rebalancing: true, - ..Default::default() - }, - ) + .get_pool_idx_existing_with_opts(bucket, object, &ObjectOptions { + skip_decommissioned: true, + skip_rebalancing: true, + ..Default::default() + }) .await { Ok(res) => res, @@ -670,16 +659,12 @@ impl ECStore { } async fn get_pool_idx_existing_no_lock(&self, bucket: &str, object: &str) -> Result { - self.get_pool_idx_existing_with_opts( - bucket, - object, - &ObjectOptions { - no_lock: true, - skip_decommissioned: true, - skip_rebalancing: true, - ..Default::default() - }, - ) + self.get_pool_idx_existing_with_opts(bucket, object, &ObjectOptions { + no_lock: true, + skip_decommissioned: true, + skip_rebalancing: true, + ..Default::default() + }) .await } @@ -1374,14 +1359,11 @@ impl StorageAPI for ECStore { if let Err(err) = self.peer_sys.make_bucket(bucket, opts).await { if !is_err_bucket_exists(&err.into()) { let _ = self - .delete_bucket( - bucket, - &DeleteBucketOptions { - no_lock: true, - no_recreate: true, - ..Default::default() - }, - ) + .delete_bucket(bucket, &DeleteBucketOptions { + no_lock: true, + no_recreate: true, + ..Default::default() + }) .await; } }; @@ -1684,14 +1666,10 @@ impl StorageAPI for ECStore { for obj in objects.iter() { futures.push(async move { - self.internal_get_pool_info_existing_with_opts( - bucket, - &obj.object_name, - &ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) + self.internal_get_pool_info_existing_with_opts(bucket, &obj.object_name, &ObjectOptions { + no_lock: true, + ..Default::default() + }) .await }); } diff --git a/crates/ecstore/src/store_list_objects.rs b/crates/ecstore/src/store_list_objects.rs index fa6c4d7f..8c9e0c3d 100644 --- a/crates/ecstore/src/store_list_objects.rs +++ b/crates/ecstore/src/store_list_objects.rs @@ -263,14 +263,10 @@ impl ECStore { // use get if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() { match self - .get_object_info( - &opts.bucket, - &opts.prefix, - &ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) + .get_object_info(&opts.bucket, &opts.prefix, &ObjectOptions { + no_lock: true, + ..Default::default() + }) .await { Ok(res) => { @@ -769,48 +765,45 @@ impl ECStore { let tx1 = sender.clone(); let tx2 = sender.clone(); - list_path_raw( - rx.resubscribe(), - ListPathRawOptions { - disks: disks.iter().cloned().map(Some).collect(), - fallback_disks: fallback_disks.iter().cloned().map(Some).collect(), - bucket: bucket.to_owned(), - path, - recursice: true, - filter_prefix: Some(filter_prefix), - forward_to: opts.marker.clone(), - min_disks: listing_quorum, - per_disk_limit: opts.limit as i32, - agreed: Some(Box::new(move |entry: MetaCacheEntry| { - Box::pin({ - let value = tx1.clone(); - async move { - if entry.is_dir() { - return; - } + list_path_raw(rx.resubscribe(), ListPathRawOptions { + disks: disks.iter().cloned().map(Some).collect(), + fallback_disks: fallback_disks.iter().cloned().map(Some).collect(), + bucket: bucket.to_owned(), + path, + recursice: true, + filter_prefix: Some(filter_prefix), + forward_to: opts.marker.clone(), + min_disks: listing_quorum, + per_disk_limit: opts.limit as i32, + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + Box::pin({ + let value = tx1.clone(); + async move { + if entry.is_dir() { + return; + } + if let Err(err) = value.send(entry).await { + error!("list_path send fail {:?}", err); + } + } + }) + })), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + Box::pin({ + let value = tx2.clone(); + let resolver = resolver.clone(); + async move { + if let Some(entry) = entries.resolve(resolver) { if let Err(err) = value.send(entry).await { error!("list_path send fail {:?}", err); } } - }) - })), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - Box::pin({ - let value = tx2.clone(); - let resolver = resolver.clone(); - async move { - if let Some(entry) = entries.resolve(resolver) { - if let Err(err) = value.send(entry).await { - error!("list_path send fail {:?}", err); - } - } - } - }) - })), - finished: None, - ..Default::default() - }, - ) + } + }) + })), + finished: None, + ..Default::default() + }) .await }); } @@ -1279,45 +1272,42 @@ impl SetDisks { let tx1 = sender.clone(); let tx2 = sender.clone(); - list_path_raw( - rx, - ListPathRawOptions { - disks: disks.iter().cloned().map(Some).collect(), - fallback_disks: fallback_disks.iter().cloned().map(Some).collect(), - bucket: opts.bucket, - path: opts.base_dir, - recursice: opts.recursive, - filter_prefix: opts.filter_prefix, - forward_to: opts.marker, - min_disks: listing_quorum, - per_disk_limit: limit, - agreed: Some(Box::new(move |entry: MetaCacheEntry| { - Box::pin({ - let value = tx1.clone(); - async move { + list_path_raw(rx, ListPathRawOptions { + disks: disks.iter().cloned().map(Some).collect(), + fallback_disks: fallback_disks.iter().cloned().map(Some).collect(), + bucket: opts.bucket, + path: opts.base_dir, + recursice: opts.recursive, + filter_prefix: opts.filter_prefix, + forward_to: opts.marker, + min_disks: listing_quorum, + per_disk_limit: limit, + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + Box::pin({ + let value = tx1.clone(); + async move { + if let Err(err) = value.send(entry).await { + error!("list_path send fail {:?}", err); + } + } + }) + })), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + Box::pin({ + let value = tx2.clone(); + let resolver = resolver.clone(); + async move { + if let Some(entry) = entries.resolve(resolver) { if let Err(err) = value.send(entry).await { error!("list_path send fail {:?}", err); } } - }) - })), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - Box::pin({ - let value = tx2.clone(); - let resolver = resolver.clone(); - async move { - if let Some(entry) = entries.resolve(resolver) { - if let Err(err) = value.send(entry).await { - error!("list_path send fail {:?}", err); - } - } - } - }) - })), - finished: None, - ..Default::default() - }, - ) + } + }) + })), + finished: None, + ..Default::default() + }) .await .map_err(Error::other) } diff --git a/crates/ecstore/src/tier/tier.rs b/crates/ecstore/src/tier/tier.rs index fdfed498..9293ff15 100644 --- a/crates/ecstore/src/tier/tier.rs +++ b/crates/ecstore/src/tier/tier.rs @@ -365,15 +365,10 @@ impl TierConfigMgr { file: &str, data: Bytes, ) -> std::result::Result<(), std::io::Error> { - self.save_config_with_opts( - api, - file, - data, - &ObjectOptions { - max_parity: true, - ..Default::default() - }, - ) + self.save_config_with_opts(api, file, data, &ObjectOptions { + max_parity: true, + ..Default::default() + }) .await } diff --git a/crates/ecstore/src/tier/warm_backend_minio.rs b/crates/ecstore/src/tier/warm_backend_minio.rs index 73da4acf..b8eeccfe 100644 --- a/crates/ecstore/src/tier/warm_backend_minio.rs +++ b/crates/ecstore/src/tier/warm_backend_minio.rs @@ -101,19 +101,13 @@ impl WarmBackend for WarmBackendMinIO { let part_size = optimal_part_size(length)?; let client = self.0.client.clone(); let res = client - .put_object( - &self.0.bucket, - &self.0.get_dest(object), - r, - length, - &PutObjectOptions { - storage_class: self.0.storage_class.clone(), - part_size: part_size as u64, - disable_content_sha256: true, - user_metadata: meta, - ..Default::default() - }, - ) + .put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &PutObjectOptions { + storage_class: self.0.storage_class.clone(), + part_size: part_size as u64, + disable_content_sha256: true, + user_metadata: meta, + ..Default::default() + }) .await?; //self.ToObjectError(err, object) Ok(res.version_id) diff --git a/crates/ecstore/src/tier/warm_backend_rustfs.rs b/crates/ecstore/src/tier/warm_backend_rustfs.rs index 8bc8142b..79e79a90 100644 --- a/crates/ecstore/src/tier/warm_backend_rustfs.rs +++ b/crates/ecstore/src/tier/warm_backend_rustfs.rs @@ -98,19 +98,13 @@ impl WarmBackend for WarmBackendRustFS { let part_size = optimal_part_size(length)?; let client = self.0.client.clone(); let res = client - .put_object( - &self.0.bucket, - &self.0.get_dest(object), - r, - length, - &PutObjectOptions { - storage_class: self.0.storage_class.clone(), - part_size: part_size as u64, - disable_content_sha256: true, - user_metadata: meta, - ..Default::default() - }, - ) + .put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &PutObjectOptions { + storage_class: self.0.storage_class.clone(), + part_size: part_size as u64, + disable_content_sha256: true, + user_metadata: meta, + ..Default::default() + }) .await?; //self.ToObjectError(err, object) Ok(res.version_id) diff --git a/crates/ecstore/src/tier/warm_backend_s3.rs b/crates/ecstore/src/tier/warm_backend_s3.rs index f6de4b75..7609e6fd 100644 --- a/crates/ecstore/src/tier/warm_backend_s3.rs +++ b/crates/ecstore/src/tier/warm_backend_s3.rs @@ -127,18 +127,12 @@ impl WarmBackend for WarmBackendS3 { ) -> Result { let client = self.client.clone(); let res = client - .put_object( - &self.bucket, - &self.get_dest(object), - r, - length, - &PutObjectOptions { - send_content_md5: true, - storage_class: self.storage_class.clone(), - user_metadata: meta, - ..Default::default() - }, - ) + .put_object(&self.bucket, &self.get_dest(object), r, length, &PutObjectOptions { + send_content_md5: true, + storage_class: self.storage_class.clone(), + user_metadata: meta, + ..Default::default() + }) .await?; Ok(res.version_id) } diff --git a/crates/iam/src/manager.rs b/crates/iam/src/manager.rs index c0831ce1..a2bdf0b9 100644 --- a/crates/iam/src/manager.rs +++ b/crates/iam/src/manager.rs @@ -1575,14 +1575,11 @@ pub fn get_default_policyes() -> HashMap { default_policies .iter() .map(|(n, p)| { - ( - n.to_string(), - PolicyDoc { - version: 1, - policy: p.clone(), - ..Default::default() - }, - ) + (n.to_string(), PolicyDoc { + version: 1, + policy: p.clone(), + ..Default::default() + }) }) .collect() } diff --git a/crates/lock/src/local_locker.rs b/crates/lock/src/local_locker.rs index ef9676c5..8a65d5b6 100644 --- a/crates/lock/src/local_locker.rs +++ b/crates/lock/src/local_locker.rs @@ -140,20 +140,17 @@ impl Locker for LocalLocker { } args.resources.iter().enumerate().for_each(|(idx, resource)| { - self.lock_map.insert( - resource.to_string(), - vec![LockRequesterInfo { - name: resource.to_string(), - writer: true, - source: args.source.to_string(), - owner: args.owner.to_string(), - uid: args.uid.to_string(), - group: args.resources.len() > 1, - quorum: args.quorum, - idx, - ..Default::default() - }], - ); + self.lock_map.insert(resource.to_string(), vec![LockRequesterInfo { + name: resource.to_string(), + writer: true, + source: args.source.to_string(), + owner: args.owner.to_string(), + uid: args.uid.to_string(), + group: args.resources.len() > 1, + quorum: args.quorum, + idx, + ..Default::default() + }]); let mut uuid = args.uid.to_string(); format_uuid(&mut uuid, &idx); @@ -230,18 +227,15 @@ impl Locker for LocalLocker { } } None => { - self.lock_map.insert( - resource.to_string(), - vec![LockRequesterInfo { - name: resource.to_string(), - writer: false, - source: args.source.to_string(), - owner: args.owner.to_string(), - uid: args.uid.to_string(), - quorum: args.quorum, - ..Default::default() - }], - ); + self.lock_map.insert(resource.to_string(), vec![LockRequesterInfo { + name: resource.to_string(), + writer: false, + source: args.source.to_string(), + owner: args.owner.to_string(), + uid: args.uid.to_string(), + quorum: args.quorum, + ..Default::default() + }]); } } let mut uuid = args.uid.to_string(); diff --git a/crates/obs/examples/server.rs b/crates/obs/examples/server.rs index fc413957..f1bea80c 100644 --- a/crates/obs/examples/server.rs +++ b/crates/obs/examples/server.rs @@ -46,10 +46,10 @@ async fn run(bucket: String, object: String, user: String, service_name: String) // Record Metrics let meter = global::meter("rustfs"); let request_duration = meter.f64_histogram("s3_request_duration_seconds").build(); - request_duration.record( - start_time.elapsed().unwrap().as_secs_f64(), - &[opentelemetry::KeyValue::new("operation", "run")], - ); + request_duration.record(start_time.elapsed().unwrap().as_secs_f64(), &[opentelemetry::KeyValue::new( + "operation", + "run", + )]); match SystemObserver::init_process_observer(meter).await { Ok(_) => info!("Process observer initialized successfully"), @@ -84,10 +84,10 @@ async fn put_object(bucket: String, object: String, user: String) { let meter = global::meter("rustfs"); let request_duration = meter.f64_histogram("s3_request_duration_seconds").build(); - request_duration.record( - start_time.elapsed().unwrap().as_secs_f64(), - &[opentelemetry::KeyValue::new("operation", "put_object")], - ); + request_duration.record(start_time.elapsed().unwrap().as_secs_f64(), &[opentelemetry::KeyValue::new( + "operation", + "put_object", + )]); info!( "Starting PUT operation content: bucket = {}, object = {}, user = {},start_time = {}", diff --git a/crates/obs/src/system/collector.rs b/crates/obs/src/system/collector.rs index ecaa0510..68f628dd 100644 --- a/crates/obs/src/system/collector.rs +++ b/crates/obs/src/system/collector.rs @@ -115,24 +115,18 @@ impl Collector { let transmitted = data.transmitted() as i64; self.metrics.network_io_per_interface.record( received, - &[ - &self.attributes.attributes[..], - &[ - KeyValue::new(INTERFACE, interface_name.to_string()), - KeyValue::new(DIRECTION, "received"), - ], - ] + &[&self.attributes.attributes[..], &[ + KeyValue::new(INTERFACE, interface_name.to_string()), + KeyValue::new(DIRECTION, "received"), + ]] .concat(), ); self.metrics.network_io_per_interface.record( transmitted, - &[ - &self.attributes.attributes[..], - &[ - KeyValue::new(INTERFACE, interface_name.to_string()), - KeyValue::new(DIRECTION, "transmitted"), - ], - ] + &[&self.attributes.attributes[..], &[ + KeyValue::new(INTERFACE, interface_name.to_string()), + KeyValue::new(DIRECTION, "transmitted"), + ]] .concat(), ); } @@ -155,10 +149,10 @@ impl Collector { }; self.metrics.process_status.record( status_value, - &[ - &self.attributes.attributes[..], - &[KeyValue::new(STATUS, format!("{:?}", process.status()))], - ] + &[&self.attributes.attributes[..], &[KeyValue::new( + STATUS, + format!("{:?}", process.status()), + )]] .concat(), ); diff --git a/crates/policy/src/policy/policy.rs b/crates/policy/src/policy/policy.rs index 1a3f5ef2..01ef5181 100644 --- a/crates/policy/src/policy/policy.rs +++ b/crates/policy/src/policy/policy.rs @@ -276,12 +276,150 @@ pub mod default { #[allow(clippy::incompatible_msrv)] pub static DEFAULT_POLICIES: LazyLock<[(&'static str, Policy); 6]> = LazyLock::new(|| { [ - ( - "readwrite", - Policy { - id: "".into(), - version: DEFAULT_VERSION.into(), - statements: vec![Statement { + ("readwrite", Policy { + id: "".into(), + version: DEFAULT_VERSION.into(), + statements: vec![Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::S3Action(S3Action::AllActions)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Resource::S3("*".into())); + hash_set + }), + conditions: Functions::default(), + ..Default::default() + }], + }), + ("readonly", Policy { + id: "".into(), + version: DEFAULT_VERSION.into(), + statements: vec![Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::S3Action(S3Action::GetBucketLocationAction)); + hash_set.insert(Action::S3Action(S3Action::GetObjectAction)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Resource::S3("*".into())); + hash_set + }), + conditions: Functions::default(), + ..Default::default() + }], + }), + ("writeonly", Policy { + id: "".into(), + version: DEFAULT_VERSION.into(), + statements: vec![Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::S3Action(S3Action::PutObjectAction)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Resource::S3("*".into())); + hash_set + }), + conditions: Functions::default(), + ..Default::default() + }], + }), + ("writeonly", Policy { + id: "".into(), + version: DEFAULT_VERSION.into(), + statements: vec![Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::S3Action(S3Action::PutObjectAction)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Resource::S3("*".into())); + hash_set + }), + conditions: Functions::default(), + ..Default::default() + }], + }), + ("diagnostics", Policy { + id: "".into(), + version: DEFAULT_VERSION.into(), + statements: vec![Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::AdminAction(AdminAction::ProfilingAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::TraceAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::ConsoleLogAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::ServerInfoAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::TopLocksAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::HealthInfoAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::PrometheusAdminAction)); + hash_set.insert(Action::AdminAction(AdminAction::BandwidthMonitorAction)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Resource::S3("*".into())); + hash_set + }), + conditions: Functions::default(), + ..Default::default() + }], + }), + ("consoleAdmin", Policy { + id: "".into(), + version: DEFAULT_VERSION.into(), + statements: vec![ + Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::AdminAction(AdminAction::AllAdminActions)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet(HashSet::new()), + conditions: Functions::default(), + ..Default::default() + }, + Statement { + sid: "".into(), + effect: Effect::Allow, + actions: ActionSet({ + let mut hash_set = HashSet::new(); + hash_set.insert(Action::KmsAction(KmsAction::AllActions)); + hash_set + }), + not_actions: ActionSet(Default::default()), + resources: ResourceSet(HashSet::new()), + conditions: Functions::default(), + ..Default::default() + }, + Statement { sid: "".into(), effect: Effect::Allow, actions: ActionSet({ @@ -297,165 +435,9 @@ pub mod default { }), conditions: Functions::default(), ..Default::default() - }], - }, - ), - ( - "readonly", - Policy { - id: "".into(), - version: DEFAULT_VERSION.into(), - statements: vec![Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::S3Action(S3Action::GetBucketLocationAction)); - hash_set.insert(Action::S3Action(S3Action::GetObjectAction)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Resource::S3("*".into())); - hash_set - }), - conditions: Functions::default(), - ..Default::default() - }], - }, - ), - ( - "writeonly", - Policy { - id: "".into(), - version: DEFAULT_VERSION.into(), - statements: vec![Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::S3Action(S3Action::PutObjectAction)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Resource::S3("*".into())); - hash_set - }), - conditions: Functions::default(), - ..Default::default() - }], - }, - ), - ( - "writeonly", - Policy { - id: "".into(), - version: DEFAULT_VERSION.into(), - statements: vec![Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::S3Action(S3Action::PutObjectAction)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Resource::S3("*".into())); - hash_set - }), - conditions: Functions::default(), - ..Default::default() - }], - }, - ), - ( - "diagnostics", - Policy { - id: "".into(), - version: DEFAULT_VERSION.into(), - statements: vec![Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::AdminAction(AdminAction::ProfilingAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::TraceAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::ConsoleLogAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::ServerInfoAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::TopLocksAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::HealthInfoAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::PrometheusAdminAction)); - hash_set.insert(Action::AdminAction(AdminAction::BandwidthMonitorAction)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Resource::S3("*".into())); - hash_set - }), - conditions: Functions::default(), - ..Default::default() - }], - }, - ), - ( - "consoleAdmin", - Policy { - id: "".into(), - version: DEFAULT_VERSION.into(), - statements: vec![ - Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::AdminAction(AdminAction::AllAdminActions)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet(HashSet::new()), - conditions: Functions::default(), - ..Default::default() - }, - Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::KmsAction(KmsAction::AllActions)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet(HashSet::new()), - conditions: Functions::default(), - ..Default::default() - }, - Statement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Action::S3Action(S3Action::AllActions)); - hash_set - }), - not_actions: ActionSet(Default::default()), - resources: ResourceSet({ - let mut hash_set = HashSet::new(); - hash_set.insert(Resource::S3("*".into())); - hash_set - }), - conditions: Functions::default(), - ..Default::default() - }, - ], - }, - ), + }, + ], + }), ] }); } diff --git a/crates/protos/src/main.rs b/crates/protos/src/main.rs index 5e48c4d5..0a7118b2 100644 --- a/crates/protos/src/main.rs +++ b/crates/protos/src/main.rs @@ -85,13 +85,9 @@ fn main() -> Result<(), AnyError> { Err(_) => "flatc".to_string(), }; - compile_flatbuffers_models( - &mut generated_mod_rs, - &flatc_path, - proto_dir.clone(), - flatbuffer_out_dir.clone(), - vec!["models"], - )?; + compile_flatbuffers_models(&mut generated_mod_rs, &flatc_path, proto_dir.clone(), flatbuffer_out_dir.clone(), vec![ + "models", + ])?; fmt(); Ok(()) diff --git a/crates/signer/src/request_signature_streaming.rs b/crates/signer/src/request_signature_streaming.rs index 7f6c2ff0..f4c36b17 100644 --- a/crates/signer/src/request_signature_streaming.rs +++ b/crates/signer/src/request_signature_streaming.rs @@ -17,9 +17,9 @@ use lazy_static::lazy_static; use std::collections::HashMap; use time::{OffsetDateTime, macros::format_description}; -use s3s::Body; use super::request_signature_v4::{SERVICE_TYPE_S3, get_scope, get_signature, get_signing_key}; use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH; +use s3s::Body; const STREAMING_SIGN_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; const STREAMING_SIGN_TRAILER_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER"; diff --git a/crates/signer/src/request_signature_v2.rs b/crates/signer/src/request_signature_v2.rs index ded0c92c..17666705 100644 --- a/crates/signer/src/request_signature_v2.rs +++ b/crates/signer/src/request_signature_v2.rs @@ -19,15 +19,14 @@ use std::collections::HashMap; use std::fmt::Write; use time::{OffsetDateTime, format_description}; -use s3s::Body; use super::utils::get_host_addr; use rustfs_utils::crypto::{base64_encode, hex, hmac_sha1}; +use s3s::Body; const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; const SIGN_V2_ALGORITHM: &str = "AWS"; fn encode_url2path(req: &request::Request, _virtual_host: bool) -> String { - req.uri().path().to_string() } diff --git a/crates/signer/src/request_signature_v4.rs b/crates/signer/src/request_signature_v4.rs index 95f14a29..5f89df8c 100644 --- a/crates/signer/src/request_signature_v4.rs +++ b/crates/signer/src/request_signature_v4.rs @@ -22,11 +22,11 @@ use std::fmt::Write; use time::{OffsetDateTime, macros::format_description}; use tracing::debug; -use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256}; -use s3s::Body; use super::constants::UNSIGNED_PAYLOAD; use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4; use super::utils::{get_host_addr, sign_v4_trim_all}; +use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256}; +use s3s::Body; pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; pub const SERVICE_TYPE_S3: &str = "s3"; @@ -270,7 +270,6 @@ pub fn pre_sign_v4( .unwrap(), ); - *req.uri_mut() = Uri::from_parts(parts).unwrap(); req @@ -282,10 +281,16 @@ fn _post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_ac get_signature(signing_key, policy_base64) } -fn _sign_v4_sts(req: request::Request, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Request { +fn _sign_v4_sts( + req: request::Request, + access_key_id: &str, + secret_access_key: &str, + location: &str, +) -> request::Request { sign_v4_inner(req, 0, access_key_id, secret_access_key, "", location, SERVICE_TYPE_STS, HeaderMap::new()) } +#[allow(clippy::too_many_arguments)] fn sign_v4_inner( mut req: request::Request, content_len: i64, diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 924d66ca..ecf98848 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -1016,7 +1016,6 @@ impl Operation for RemoveRemoteTargetHandler { ))); }; - let mut need_delete = true; if let Some(arnstr) = querys.get("arn") { @@ -1047,7 +1046,6 @@ impl Operation for RemoveRemoteTargetHandler { error!("need delete target is {}", decoded_str); bucket_targets::remove_bucket_target(bucket, arnstr).await; } - } // List bucket targets and return as JSON to client // match bucket_targets::list_bucket_targets(bucket).await { diff --git a/rustfs/src/admin/handlers/bucket_meta.rs b/rustfs/src/admin/handlers/bucket_meta.rs index 074349bc..a5f0fea8 100644 --- a/rustfs/src/admin/handlers/bucket_meta.rs +++ b/rustfs/src/admin/handlers/bucket_meta.rs @@ -453,13 +453,10 @@ impl Operation for ImportBucketMetadata { // create bucket if not exists if !bucket_metadatas.contains_key(bucket_name) { if let Err(e) = store - .make_bucket( - bucket_name, - &MakeBucketOptions { - force_create: true, - ..Default::default() - }, - ) + .make_bucket(bucket_name, &MakeBucketOptions { + force_create: true, + ..Default::default() + }) .await { warn!("create bucket failed: {e}"); diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index 64ebcb41..60c97859 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -445,20 +445,17 @@ impl Operation for ExportIam { let users: HashMap = users .into_iter() .map(|(k, v)| { - ( - k, - AddOrUpdateUserReq { - secret_key: v.credentials.secret_key, - status: { - if v.credentials.status == "off" { - AccountStatus::Disabled - } else { - AccountStatus::Enabled - } - }, - policy: None, + (k, AddOrUpdateUserReq { + secret_key: v.credentials.secret_key, + status: { + if v.credentials.status == "off" { + AccountStatus::Disabled + } else { + AccountStatus::Enabled + } }, - ) + policy: None, + }) }) .collect::>(); diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 60561264..a6b52487 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -310,14 +310,11 @@ impl S3 for FS { }; store - .make_bucket( - &bucket, - &MakeBucketOptions { - force_create: true, - lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v), - ..Default::default() - }, - ) + .make_bucket(&bucket, &MakeBucketOptions { + force_create: true, + lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v), + ..Default::default() + }) .await .map_err(ApiError::from)?; @@ -498,13 +495,10 @@ impl S3 for FS { }; store - .delete_bucket( - &input.bucket, - &DeleteBucketOptions { - force: false, - ..Default::default() - }, - ) + .delete_bucket(&input.bucket, &DeleteBucketOptions { + force: false, + ..Default::default() + }) .await .map_err(ApiError::from)?; @@ -1976,7 +1970,7 @@ impl S3 for FS { .. } = req.input; - let mut lr_retention = false; + let lr_retention = false; /*let rcfg = metadata_sys::get_object_lock_config(&bucket).await; if let Ok(rcfg) = rcfg { if let Some(rule) = rcfg.0.rule {