fix: make lint build and clippy happy (#71)

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
This commit is contained in:
yihong
2025-07-07 09:55:53 +08:00
committed by GitHub
parent 040b05c318
commit abd5dff9b5
47 changed files with 1091 additions and 1379 deletions

4
Cargo.lock generated
View File

@@ -8312,8 +8312,12 @@ dependencies = [
"http 1.3.1",
"hyper 1.6.0",
"lazy_static",
"rand 0.9.1",
"rustfs-utils",
"s3s",
"serde",
"serde_urlencoded",
"tempfile",
"time",
"tracing",
]

View File

@@ -568,14 +568,11 @@ mod tests {
let mut latency = LastMinuteLatency::default();
// Add data at time 1000
latency.add_all(
1000,
&AccElem {
total: 10,
size: 0,
n: 1,
},
);
latency.add_all(1000, &AccElem {
total: 10,
size: 0,
n: 1,
});
// Forward to time 1030 (30 seconds later)
latency.forward_to(1030);

View File

@@ -236,13 +236,10 @@ impl BucketMetadataSys {
futures.push(async move {
sleep(Duration::from_millis(30)).await;
let _ = api
.heal_bucket(
&bucket,
&HealOpts {
recreate: true,
..Default::default()
},
)
.heal_bucket(&bucket, &HealOpts {
recreate: true,
..Default::default()
})
.await;
load_bucket_metadata(self.api.clone(), bucket.as_str()).await
});

View File

@@ -74,26 +74,23 @@ impl TransitionClient {
url_values.insert("policy".to_string(), "".to_string());
let resp = self
.execute_method(
http::Method::DELETE,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
object_name: "".to_string(),
custom_header: HeaderMap::new(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::DELETE, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
object_name: "".to_string(),
custom_header: HeaderMap::new(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
//defer closeResponse(resp)
@@ -114,26 +111,23 @@ impl TransitionClient {
url_values.insert("policy".to_string(), "".to_string());
let resp = self
.execute_method(
http::Method::GET,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
object_name: "".to_string(),
custom_header: HeaderMap::new(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::GET, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
object_name: "".to_string(),
custom_header: HeaderMap::new(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
let policy = String::from_utf8_lossy(&resp.body().bytes().expect("err").to_vec()).to_string();

View File

@@ -43,26 +43,23 @@ impl TransitionClient {
opts: &GetObjectOptions,
) -> Result<(ObjectInfo, HeaderMap, ReadCloser), std::io::Error> {
let resp = self
.execute_method(
http::Method::GET,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: opts.to_query_values(),
custom_header: opts.header(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::GET, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: opts.to_query_values(),
custom_header: opts.header(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
let resp = &resp;

View File

@@ -21,40 +21,40 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use s3s::dto::Owner;
use std::io::Cursor;
use std::collections::HashMap;
use std::io::Cursor;
use tokio::io::BufReader;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use crate::client::{
api_error_response::{err_invalid_argument, http_resp_to_error_response},
api_get_options::GetObjectOptions,
transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient},
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
struct Grantee {
id: String,
display_name: String,
uri: String,
pub struct Grantee {
pub id: String,
pub display_name: String,
pub uri: String,
}
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
struct Grant {
grantee: Grantee,
permission: String,
pub struct Grant {
pub grantee: Grantee,
pub permission: String,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct AccessControlList {
pub grant: Vec<Grant>,
pub grant: Vec<Grant>,
pub permission: String,
}
#[derive(Debug, Default, serde::Deserialize)]
pub struct AccessControlPolicy {
#[serde(skip)]
owner: Owner,
owner: Owner,
pub access_control_list: AccessControlList,
}
@@ -63,26 +63,23 @@ impl TransitionClient {
let mut url_values = HashMap::new();
url_values.insert("acl".to_string(), "".to_string());
let mut resp = self
.execute_method(
http::Method::GET,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: HeaderMap::new(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::GET, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: HeaderMap::new(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
if resp.status() != http::StatusCode::OK {
@@ -98,7 +95,9 @@ impl TransitionClient {
}
};
let mut obj_info = self.stat_object(bucket_name, object_name, &GetObjectOptions::default()).await?;
let mut obj_info = self
.stat_object(bucket_name, object_name, &GetObjectOptions::default())
.await?;
obj_info.owner.display_name = res.owner.display_name.clone();
obj_info.owner.id = res.owner.id.clone();
@@ -107,7 +106,9 @@ impl TransitionClient {
let canned_acl = get_canned_acl(&res);
if canned_acl != "" {
obj_info.metadata.insert("X-Amz-Acl", HeaderValue::from_str(&canned_acl).unwrap());
obj_info
.metadata
.insert("X-Amz-Acl", HeaderValue::from_str(&canned_acl).unwrap());
return Ok(obj_info);
}

View File

@@ -20,41 +20,43 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use time::OffsetDateTime;
use std::io::Cursor;
use std::collections::HashMap;
use std::io::Cursor;
use time::OffsetDateTime;
use tokio::io::BufReader;
use s3s::{Body, dto::Owner};
use s3s::header::{X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_DELETE_MARKER, X_AMZ_METADATA_DIRECTIVE, X_AMZ_VERSION_ID,
X_AMZ_REQUEST_CHARGED, X_AMZ_RESTORE, X_AMZ_PART_NUMBER_MARKER, X_AMZ_MAX_PARTS,};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use crate::client::constants::{GET_OBJECT_ATTRIBUTES_MAX_PARTS, GET_OBJECT_ATTRIBUTES_TAGS, ISO8601_DATEFORMAT};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::header::{
X_AMZ_DELETE_MARKER, X_AMZ_MAX_PARTS, X_AMZ_METADATA_DIRECTIVE, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER,
X_AMZ_REQUEST_CHARGED, X_AMZ_RESTORE, X_AMZ_VERSION_ID,
};
use s3s::{Body, dto::Owner};
use crate::client::{
api_error_response::err_invalid_argument,
api_get_object_acl::AccessControlPolicy,
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
api_get_object_acl::AccessControlPolicy,
};
struct ObjectAttributesOptions {
max_parts: i64,
version_id: String,
part_number_marker: i64,
pub struct ObjectAttributesOptions {
pub max_parts: i64,
pub version_id: String,
pub part_number_marker: i64,
//server_side_encryption: encrypt::ServerSide,
}
struct ObjectAttributes {
version_id: String,
last_modified: OffsetDateTime,
object_attributes_response: ObjectAttributesResponse,
pub struct ObjectAttributes {
pub version_id: String,
pub last_modified: OffsetDateTime,
pub object_attributes_response: ObjectAttributesResponse,
}
impl ObjectAttributes {
fn new() -> Self {
Self {
version_id: "".to_string(),
version_id: "".to_string(),
last_modified: OffsetDateTime::now_utc(),
object_attributes_response: ObjectAttributesResponse::new(),
}
@@ -63,81 +65,81 @@ impl ObjectAttributes {
#[derive(Debug, Default, serde::Deserialize)]
pub struct Checksum {
checksum_crc32: String,
checksum_crc32: String,
checksum_crc32c: String,
checksum_sha1: String,
checksum_sha1: String,
checksum_sha256: String,
}
impl Checksum {
fn new() -> Self {
Self {
checksum_crc32: "".to_string(),
checksum_crc32: "".to_string(),
checksum_crc32c: "".to_string(),
checksum_sha1: "".to_string(),
checksum_sha1: "".to_string(),
checksum_sha256: "".to_string(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
struct ObjectParts {
parts_count: i64,
part_number_marker: i64,
next_part_number_marker: i64,
max_parts: i64,
is_truncated: bool,
parts: Vec<ObjectAttributePart>,
pub struct ObjectParts {
pub parts_count: i64,
pub part_number_marker: i64,
pub next_part_number_marker: i64,
pub max_parts: i64,
is_truncated: bool,
parts: Vec<ObjectAttributePart>,
}
impl ObjectParts {
fn new() -> Self {
Self {
parts_count: 0,
part_number_marker: 0,
parts_count: 0,
part_number_marker: 0,
next_part_number_marker: 0,
max_parts: 0,
is_truncated: false,
parts: Vec::new(),
max_parts: 0,
is_truncated: false,
parts: Vec::new(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
struct ObjectAttributesResponse {
etag: String,
storage_class: String,
object_size: i64,
checksum: Checksum,
object_parts: ObjectParts,
pub struct ObjectAttributesResponse {
pub etag: String,
pub storage_class: String,
pub object_size: i64,
pub checksum: Checksum,
pub object_parts: ObjectParts,
}
impl ObjectAttributesResponse {
fn new() -> Self {
Self {
etag: "".to_string(),
etag: "".to_string(),
storage_class: "".to_string(),
object_size: 0,
checksum: Checksum::new(),
object_parts: ObjectParts::new(),
object_size: 0,
checksum: Checksum::new(),
object_parts: ObjectParts::new(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
struct ObjectAttributePart {
checksum_crc32: String,
checksum_crc32: String,
checksum_crc32c: String,
checksum_sha1: String,
checksum_sha1: String,
checksum_sha256: String,
part_number: i64,
size: i64,
part_number: i64,
size: i64,
}
impl ObjectAttributes {
pub async fn parse_response(&mut self, resp: &mut http::Response<Body>) -> Result<(), std::io::Error> {
let h = resp.headers();
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
self.last_modified = mod_time;
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
@@ -155,7 +157,12 @@ impl ObjectAttributes {
}
impl TransitionClient {
pub async fn get_object_attributes(&self, bucket_name: &str, object_name: &str, opts: ObjectAttributesOptions) -> Result<ObjectAttributes, std::io::Error> {
pub async fn get_object_attributes(
&self,
bucket_name: &str,
object_name: &str,
opts: ObjectAttributesOptions,
) -> Result<ObjectAttributes, std::io::Error> {
let mut url_values = HashMap::new();
url_values.insert("attributes".to_string(), "".to_string());
if opts.version_id != "" {
@@ -166,13 +173,19 @@ impl TransitionClient {
headers.insert(X_AMZ_OBJECT_ATTRIBUTES, HeaderValue::from_str(GET_OBJECT_ATTRIBUTES_TAGS).unwrap());
if opts.part_number_marker > 0 {
headers.insert(X_AMZ_PART_NUMBER_MARKER, HeaderValue::from_str(&opts.part_number_marker.to_string()).unwrap());
headers.insert(
X_AMZ_PART_NUMBER_MARKER,
HeaderValue::from_str(&opts.part_number_marker.to_string()).unwrap(),
);
}
if opts.max_parts > 0 {
headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&opts.max_parts.to_string()).unwrap());
} else {
headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&GET_OBJECT_ATTRIBUTES_MAX_PARTS.to_string()).unwrap());
headers.insert(
X_AMZ_MAX_PARTS,
HeaderValue::from_str(&GET_OBJECT_ATTRIBUTES_MAX_PARTS.to_string()).unwrap(),
);
}
/*if opts.server_side_encryption.is_some() {
@@ -180,32 +193,31 @@ impl TransitionClient {
}*/
let mut resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: headers,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::HEAD, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: headers,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
let h = resp.headers();
let has_etag = h.get("ETag").unwrap().to_str().unwrap();
if !has_etag.is_empty() {
return Err(std::io::Error::other("get_object_attributes is not supported by the current endpoint version"));
return Err(std::io::Error::other(
"get_object_attributes is not supported by the current endpoint version",
));
}
if resp.status() != http::StatusCode::OK {
@@ -226,4 +238,4 @@ impl TransitionClient {
Ok(oa)
}
}
}

View File

@@ -21,15 +21,15 @@
use bytes::Bytes;
use http::HeaderMap;
use std::io::Cursor;
use tokio::io::BufReader;
#[cfg(not(windows))]
use std::os::unix::fs::PermissionsExt;
use std::os::unix::fs::MetadataExt;
#[cfg(not(windows))]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(not(windows))]
use std::os::unix::fs::MetadataExt;
use std::os::unix::fs::PermissionsExt;
#[cfg(windows)]
use std::os::windows::fs::MetadataExt;
use tokio::io::BufReader;
use crate::client::{
api_error_response::err_invalid_argument,
@@ -38,14 +38,20 @@ use crate::client::{
};
impl TransitionClient {
pub async fn fget_object(&self, bucket_name: &str, object_name: &str, file_path: &str, opts: GetObjectOptions) -> Result<(), std::io::Error> {
pub async fn fget_object(
&self,
bucket_name: &str,
object_name: &str,
file_path: &str,
opts: GetObjectOptions,
) -> Result<(), std::io::Error> {
match std::fs::metadata(file_path) {
Ok(file_path_stat) => {
let ft = file_path_stat.file_type();
if ft.is_dir() {
return Err(std::io::Error::other(err_invalid_argument("filename is a directory.")));
}
},
}
Err(err) => {
return Err(std::io::Error::other(err));
}
@@ -77,7 +83,7 @@ impl TransitionClient {
};
let mut file_part_path = file_path.to_string();
file_part_path.push_str(""/*sum_sha256_hex(object_stat.etag.as_bytes())*/);
file_part_path.push_str("" /*sum_sha256_hex(object_stat.etag.as_bytes())*/);
file_part_path.push_str(".part.rustfs");
#[cfg(not(windows))]
@@ -138,4 +144,4 @@ impl TransitionClient {
Ok(())
}
}
}

View File

@@ -76,26 +76,23 @@ impl TransitionClient {
}
let mut resp = self
.execute_method(
http::Method::GET,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: "".to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: headers,
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::GET, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: "".to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: headers,
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, "")));

View File

@@ -78,26 +78,23 @@ impl TransitionClient {
let headers = HeaderMap::new();
let resp = self
.execute_method(
Method::DELETE,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: headers,
object_name: "".to_string(),
query_values: Default::default(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(Method::DELETE, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: headers,
object_name: "".to_string(),
query_values: Default::default(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
{
@@ -109,26 +106,23 @@ impl TransitionClient {
pub async fn remove_bucket(&self, bucket_name: &str) -> Result<(), std::io::Error> {
let resp = self
.execute_method(
http::Method::DELETE,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: Default::default(),
object_name: "".to_string(),
query_values: Default::default(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::DELETE, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: Default::default(),
object_name: "".to_string(),
query_values: Default::default(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
{
@@ -163,26 +157,23 @@ impl TransitionClient {
}
let resp = self
.execute_method(
http::Method::DELETE,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
query_values: url_values,
custom_header: headers,
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::DELETE, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
query_values: url_values,
custom_header: headers,
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
Ok(RemoveObjectResult {
@@ -277,15 +268,11 @@ impl TransitionClient {
while let Some(object) = objects_rx.recv().await {
if has_invalid_xml_char(&object.name) {
let remove_result = self
.remove_object_inner(
bucket_name,
&object.name,
RemoveObjectOptions {
version_id: object.version_id.expect("err").to_string(),
governance_bypass: opts.governance_bypass,
..Default::default()
},
)
.remove_object_inner(bucket_name, &object.name, RemoveObjectOptions {
version_id: object.version_id.expect("err").to_string(),
governance_bypass: opts.governance_bypass,
..Default::default()
})
.await?;
let remove_result_clone = remove_result.clone();
if !remove_result.err.is_none() {
@@ -322,26 +309,23 @@ impl TransitionClient {
let remove_bytes = generate_remove_multi_objects_request(&batch);
let resp = self
.execute_method(
http::Method::POST,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
query_values: url_values.clone(),
content_body: ReaderImpl::Body(Bytes::from(remove_bytes.clone())),
content_length: remove_bytes.len() as i64,
content_md5_base64: base64_encode(&HashAlgorithm::Md5.hash_encode(&remove_bytes).as_ref()),
content_sha256_hex: base64_encode(&HashAlgorithm::SHA256.hash_encode(&remove_bytes).as_ref()),
custom_header: headers,
object_name: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::POST, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
query_values: url_values.clone(),
content_body: ReaderImpl::Body(Bytes::from(remove_bytes.clone())),
content_length: remove_bytes.len() as i64,
content_md5_base64: base64_encode(&HashAlgorithm::Md5.hash_encode(&remove_bytes).as_ref()),
content_sha256_hex: base64_encode(&HashAlgorithm::SHA256.hash_encode(&remove_bytes).as_ref()),
custom_header: headers,
object_name: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
let body_bytes: Vec<u8> = resp.body().bytes().expect("err").to_vec();
@@ -369,26 +353,23 @@ impl TransitionClient {
url_values.insert("uploadId".to_string(), upload_id.to_string());
let resp = self
.execute_method(
http::Method::DELETE,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: HeaderMap::new(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::DELETE, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
custom_header: HeaderMap::new(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
//if resp.is_some() {
if resp.status() != StatusCode::NO_CONTENT {

View File

@@ -20,12 +20,15 @@
use bytes::Bytes;
use http::HeaderMap;
use std::collections::HashMap;
use std::io::Cursor;
use tokio::io::BufReader;
use std::collections::HashMap;
use crate::client::{
api_error_response::{err_invalid_argument, http_resp_to_error_response}, api_get_object_acl::AccessControlList, api_get_options::GetObjectOptions, transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient}
api_error_response::{err_invalid_argument, http_resp_to_error_response},
api_get_object_acl::AccessControlList,
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
const TIER_STANDARD: &str = "Standard";
@@ -33,55 +36,55 @@ const TIER_BULK: &str = "Bulk";
const TIER_EXPEDITED: &str = "Expedited";
#[derive(Debug, Default, serde::Serialize)]
struct GlacierJobParameters {
tier: String,
pub struct GlacierJobParameters {
pub tier: String,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct Encryption {
encryption_type: String,
kms_context: String,
kms_key_id: String,
pub struct Encryption {
pub encryption_type: String,
pub kms_context: String,
pub kms_key_id: String,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct MetadataEntry {
name: String,
value: String,
pub struct MetadataEntry {
pub name: String,
pub value: String,
}
#[derive(Debug, Default, serde::Serialize)]
struct S3 {
access_control_list: AccessControlList,
bucket_name: String,
prefix: String,
canned_acl: String,
encryption: Encryption,
storage_class: String,
pub struct S3 {
pub access_control_list: AccessControlList,
pub bucket_name: String,
pub prefix: String,
pub canned_acl: String,
pub encryption: Encryption,
pub storage_class: String,
//tagging: Tags,
user_metadata: MetadataEntry,
pub user_metadata: MetadataEntry,
}
#[derive(Debug, Default, serde::Serialize)]
struct SelectParameters {
expression_type: String,
expression: String,
pub struct SelectParameters {
pub expression_type: String,
pub expression: String,
//input_serialization: SelectObjectInputSerialization,
//output_serialization: SelectObjectOutputSerialization,
}
#[derive(Debug, Default, serde::Serialize)]
struct OutputLocation(S3);
pub struct OutputLocation(pub S3);
#[derive(Debug, Default, serde::Serialize)]
struct RestoreRequest {
restore_type: String,
tier: String,
days: i64,
glacier_job_parameters: GlacierJobParameters,
description: String,
select_parameters: SelectParameters,
output_location: OutputLocation,
pub struct RestoreRequest {
pub restore_type: String,
pub tier: String,
pub days: i64,
pub glacier_job_parameters: GlacierJobParameters,
pub description: String,
pub select_parameters: SelectParameters,
pub output_location: OutputLocation,
}
impl RestoreRequest {
@@ -115,7 +118,13 @@ impl RestoreRequest {
}
impl TransitionClient {
pub async fn restore_object(&self, bucket_name: &str, object_name: &str, version_id: &str, restore_req: &RestoreRequest) -> Result<(), std::io::Error> {
pub async fn restore_object(
&self,
bucket_name: &str,
object_name: &str,
version_id: &str,
restore_req: &RestoreRequest,
) -> Result<(), std::io::Error> {
let restore_request = match serde_xml_rs::to_string(restore_req) {
Ok(buf) => buf,
Err(e) => {
@@ -132,26 +141,23 @@ impl TransitionClient {
let restore_request_buffer = Bytes::from(restore_request_bytes.clone());
let resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: HeaderMap::new(),
content_sha256_hex: "".to_string(), //sum_sha256_hex(&restore_request_bytes),
content_md5_base64: "".to_string(), //sum_md5_base64(&restore_request_bytes),
content_body: ReaderImpl::Body(restore_request_buffer),
content_length: restore_request_bytes.len() as i64,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::HEAD, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: HeaderMap::new(),
content_sha256_hex: "".to_string(), //sum_sha256_hex(&restore_request_bytes),
content_md5_base64: "".to_string(), //sum_md5_base64(&restore_request_bytes),
content_body: ReaderImpl::Body(restore_request_buffer),
content_length: restore_request_bytes.len() as i64,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await?;
let b = resp.body().bytes().expect("err").to_vec();
@@ -160,4 +166,4 @@ impl TransitionClient {
}
Ok(())
}
}
}

View File

@@ -21,40 +21,37 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use uuid::Uuid;
use std::{collections::HashMap, str::FromStr};
use tokio::io::BufReader;
use uuid::Uuid;
use s3s::header::{X_AMZ_DELETE_MARKER, X_AMZ_VERSION_ID};
use crate::client::{
api_error_response::{err_invalid_argument, http_resp_to_error_response, ErrorResponse},
api_error_response::{ErrorResponse, err_invalid_argument, http_resp_to_error_response},
api_get_options::GetObjectOptions,
transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient},
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use s3s::header::{X_AMZ_DELETE_MARKER, X_AMZ_VERSION_ID};
impl TransitionClient {
pub async fn bucket_exists(&self, bucket_name: &str) -> Result<bool, std::io::Error> {
let resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: "".to_string(),
query_values: HashMap::new(),
custom_header: HeaderMap::new(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::HEAD, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: "".to_string(),
query_values: HashMap::new(),
custom_header: HeaderMap::new(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await;
if let Ok(resp) = resp {
@@ -70,7 +67,12 @@ impl TransitionClient {
Ok(true)
}
pub async fn stat_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result<ObjectInfo, std::io::Error> {
pub async fn stat_object(
&self,
bucket_name: &str,
object_name: &str,
opts: &GetObjectOptions,
) -> Result<ObjectInfo, std::io::Error> {
let mut headers = opts.header();
if opts.internal.replication_delete_marker {
headers.insert("X-Source-DeleteMarker", HeaderValue::from_str("true").unwrap());
@@ -80,26 +82,23 @@ impl TransitionClient {
}
let resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: opts.to_query_values(),
custom_header: headers,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.execute_method(http::Method::HEAD, &mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: opts.to_query_values(),
custom_header: headers,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
})
.await;
match resp {
@@ -107,24 +106,30 @@ impl TransitionClient {
let h = resp.headers();
let delete_marker = if let Some(x_amz_delete_marker) = h.get(X_AMZ_DELETE_MARKER.as_str()) {
x_amz_delete_marker.to_str().unwrap() == "true"
} else { false };
} else {
false
};
let replication_ready = if let Some(x_amz_delete_marker) = h.get("X-Replication-Ready") {
x_amz_delete_marker.to_str().unwrap() == "true"
} else { false };
} else {
false
};
if resp.status() != http::StatusCode::OK && resp.status() != http::StatusCode::PARTIAL_CONTENT {
if resp.status() == http::StatusCode::METHOD_NOT_ALLOWED && opts.version_id != "" && delete_marker {
let err_resp = ErrorResponse {
status_code: resp.status(),
code: s3s::S3ErrorCode::MethodNotAllowed,
message: "the specified method is not allowed against this resource.".to_string(),
code: s3s::S3ErrorCode::MethodNotAllowed,
message: "the specified method is not allowed against this resource.".to_string(),
bucket_name: bucket_name.to_string(),
key: object_name.to_string(),
key: object_name.to_string(),
..Default::default()
};
return Ok(ObjectInfo {
version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) {
version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) {
Ok(v) => v,
Err(e) => { return Err(std::io::Error::other(e)); }
Err(e) => {
return Err(std::io::Error::other(e));
}
},
is_delete_marker: delete_marker,
..Default::default()
@@ -132,12 +137,14 @@ impl TransitionClient {
//err_resp
}
return Ok(ObjectInfo {
version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) {
version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) {
Ok(v) => v,
Err(e) => { return Err(std::io::Error::other(e)); }
Err(e) => {
return Err(std::io::Error::other(e));
}
},
is_delete_marker: delete_marker,
replication_ready: replication_ready,
is_delete_marker: delete_marker,
replication_ready: replication_ready,
..Default::default()
});
//http_resp_to_error_response(resp, bucket_name, object_name)

View File

@@ -167,8 +167,7 @@ impl TransitionClient {
content_sha256 = UNSIGNED_PAYLOAD.to_string();
}
req
.headers_mut()
req.headers_mut()
.insert("X-Amz-Content-Sha256", content_sha256.parse().unwrap());
let req = rustfs_signer::sign_v4(req, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1");
Ok(req)

View File

@@ -16,6 +16,9 @@ pub mod admin_handler_utils;
pub mod api_bucket_policy;
pub mod api_error_response;
pub mod api_get_object;
pub mod api_get_object_acl;
pub mod api_get_object_attributes;
pub mod api_get_object_file;
pub mod api_get_options;
pub mod api_list;
pub mod api_put_object;
@@ -24,11 +27,8 @@ pub mod api_put_object_multipart;
pub mod api_put_object_streaming;
pub mod api_remove;
pub mod api_restore;
pub mod api_stat;
pub mod api_get_object_acl;
pub mod api_get_object_attributes;
pub mod api_get_object_file;
pub mod api_s3_datatypes;
pub mod api_stat;
pub mod bucket_cache;
pub mod constants;
pub mod credentials;

View File

@@ -31,14 +31,10 @@ pub async fn delete_object_versions(api: ECStore, bucket: &str, to_del: &[Object
remaining = &[];
}
let vc = BucketVersioningSys::get(bucket).await.expect("err!");
let _deleted_objs = api.delete_objects(
bucket,
to_del.to_vec(),
ObjectOptions {
//prefix_enabled_fn: vc.prefix_enabled(""),
version_suspended: vc.suspended(),
..Default::default()
},
);
let _deleted_objs = api.delete_objects(bucket, to_del.to_vec(), ObjectOptions {
//prefix_enabled_fn: vc.prefix_enabled(""),
version_suspended: vc.suspended(),
..Default::default()
});
}
}

View File

@@ -387,7 +387,11 @@ impl TransitionClient {
&metadata.query_values,
)?;
let Ok(mut req) = Request::builder().method(method).uri(target_url.to_string()).body(Body::empty()) else {
let Ok(mut req) = Request::builder()
.method(method)
.uri(target_url.to_string())
.body(Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -428,13 +432,7 @@ impl TransitionClient {
}
}
if signer_type == SignatureType::SignatureV2 {
req = rustfs_signer::pre_sign_v2(
req,
&access_key_id,
&secret_access_key,
metadata.expires,
is_virtual_host,
);
req = rustfs_signer::pre_sign_v2(req, &access_key_id, &secret_access_key, metadata.expires, is_virtual_host);
} else if signer_type == SignatureType::SignatureV4 {
req = rustfs_signer::pre_sign_v4(
req,
@@ -458,9 +456,7 @@ impl TransitionClient {
//req.content_length = metadata.content_length;
if metadata.content_length <= -1 {
let chunked_value = HeaderValue::from_str(&vec!["chunked"].join(",")).expect("err");
req
.headers_mut()
.insert(http::header::TRANSFER_ENCODING, chunked_value);
req.headers_mut().insert(http::header::TRANSFER_ENCODING, chunked_value);
}
if metadata.content_md5_base64.len() > 0 {
@@ -473,8 +469,7 @@ impl TransitionClient {
}
if signer_type == SignatureType::SignatureV2 {
req =
rustfs_signer::sign_v2(req, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host);
req = rustfs_signer::sign_v2(req, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host);
} else if metadata.stream_sha256 && !self.secure {
if metadata.trailer.len() > 0 {
for (_, v) in &metadata.trailer {
@@ -491,8 +486,8 @@ impl TransitionClient {
} else if metadata.trailer.len() > 0 {
sha_header = UNSIGNED_PAYLOAD_TRAILER.to_string();
}
req
.headers_mut().insert("X-Amz-Content-Sha256".parse::<HeaderName>().unwrap(), sha_header.parse().expect("err"));
req.headers_mut()
.insert("X-Amz-Content-Sha256".parse::<HeaderName>().unwrap(), sha_header.parse().expect("err"));
req = rustfs_signer::sign_v4_trailer(
req,
@@ -516,7 +511,7 @@ impl TransitionClient {
}
Ok(req)
}
}
pub fn set_user_agent(&self, req: &mut Request<Body>) {
let headers = req.headers_mut();

View File

@@ -70,29 +70,20 @@ pub async fn read_config_with_metadata<S: StorageAPI>(
}
pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>) -> Result<()> {
save_config_with_opts(
api,
file,
data,
&ObjectOptions {
max_parity: true,
..Default::default()
},
)
save_config_with_opts(api, file, data, &ObjectOptions {
max_parity: true,
..Default::default()
})
.await
}
pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()> {
match api
.delete_object(
RUSTFS_META_BUCKET,
file,
ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
..Default::default()
},
)
.delete_object(RUSTFS_META_BUCKET, file, ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
..Default::default()
})
.await
{
Ok(_) => Ok(()),

View File

@@ -2026,15 +2026,11 @@ impl DiskAPI for LocalDisk {
) -> Result<()> {
if path.starts_with(SLASH_SEPARATOR) {
return self
.delete(
volume,
path,
DeleteOptions {
recursive: false,
immediate: false,
..Default::default()
},
)
.delete(volume, path, DeleteOptions {
recursive: false,
immediate: false,
..Default::default()
})
.await;
}

View File

@@ -317,13 +317,10 @@ enum WriterType {
impl std::fmt::Debug for BitrotWriterWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BitrotWriterWrapper")
.field(
"writer_type",
&match self.writer_type {
WriterType::InlineBuffer => "InlineBuffer",
WriterType::Other => "Other",
},
)
.field("writer_type", &match self.writer_type {
WriterType::InlineBuffer => "InlineBuffer",
WriterType::Other => "Other",
})
.finish()
}
}

View File

@@ -174,13 +174,10 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
.bucket_sizes
.iter()
.map(|(bucket, &size)| {
(
bucket.clone(),
BucketUsageInfo {
size,
..Default::default()
},
)
(bucket.clone(), BucketUsageInfo {
size,
..Default::default()
})
})
.collect();
}
@@ -201,17 +198,16 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
{
if let Ok((cfg, _)) = get_replication_config(bucket).await {
if !cfg.role.is_empty() {
data_usage_info.replication_info.insert(
cfg.role.clone(),
BucketTargetUsageInfo {
data_usage_info
.replication_info
.insert(cfg.role.clone(), BucketTargetUsageInfo {
replication_failed_size: bui.replication_failed_size_v1,
replication_failed_count: bui.replication_failed_count_v1,
replicated_size: bui.replicated_size_v1,
replication_pending_count: bui.replication_pending_count_v1,
replication_pending_size: bui.replication_pending_size_v1,
..Default::default()
},
);
});
}
}
}

View File

@@ -183,14 +183,11 @@ impl AllTierStats {
fn populate_stats(&self, stats: &mut HashMap<String, TierStats>) {
for (tier, st) in &self.tiers {
stats.insert(
tier.clone(),
TierStats {
total_size: st.total_size,
num_versions: st.num_versions,
num_objects: st.num_objects,
},
);
stats.insert(tier.clone(), TierStats {
total_size: st.total_size,
num_versions: st.num_versions,
num_objects: st.num_objects,
});
}
}
}
@@ -446,16 +443,10 @@ impl DataUsageCache {
let path = Path::new(BUCKET_META_PREFIX).join(name);
// warn!("Loading data usage cache from backend: {}", path.display());
match store
.get_object_reader(
RUSTFS_META_BUCKET,
path.to_str().unwrap(),
None,
HeaderMap::new(),
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.get_object_reader(RUSTFS_META_BUCKET, path.to_str().unwrap(), None, HeaderMap::new(), &ObjectOptions {
no_lock: true,
..Default::default()
})
.await
{
Ok(mut reader) => {
@@ -469,16 +460,10 @@ impl DataUsageCache {
match err {
Error::FileNotFound | Error::VolumeNotFound => {
match store
.get_object_reader(
RUSTFS_META_BUCKET,
name,
None,
HeaderMap::new(),
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.get_object_reader(RUSTFS_META_BUCKET, name, None, HeaderMap::new(), &ObjectOptions {
no_lock: true,
..Default::default()
})
.await
{
Ok(mut reader) => {
@@ -819,18 +804,15 @@ impl DataUsageCache {
bui.replica_count = rs.replica_count;
for (arn, stat) in rs.targets.iter() {
bui.replication_info.insert(
arn.clone(),
BucketTargetUsageInfo {
replication_pending_size: stat.pending_size,
replicated_size: stat.replicated_size,
replication_failed_size: stat.failed_size,
replication_pending_count: stat.pending_count,
replication_failed_count: stat.failed_count,
replicated_count: stat.replicated_count,
..Default::default()
},
);
bui.replication_info.insert(arn.clone(), BucketTargetUsageInfo {
replication_pending_size: stat.pending_size,
replicated_size: stat.replicated_size,
replication_failed_size: stat.failed_size,
replication_pending_count: stat.pending_count,
replication_failed_count: stat.failed_count,
replicated_count: stat.replicated_count,
..Default::default()
});
}
}
dst.insert(bucket.name.clone(), bui);

View File

@@ -255,15 +255,11 @@ impl HealingTracker {
pub async fn delete(&self) -> Result<()> {
if let Some(disk) = &self.disk {
let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME);
disk.delete(
RUSTFS_META_BUCKET,
file_path.to_str().unwrap(),
DeleteOptions {
recursive: false,
immediate: false,
..Default::default()
},
)
disk.delete(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), DeleteOptions {
recursive: false,
immediate: false,
..Default::default()
})
.await?;
}

View File

@@ -148,14 +148,11 @@ async fn collect_local_disks_metrics(disks: &HashSet<String>) -> HashMap<String,
}
if d.state != *DRIVE_STATE_OK && d.state != *DRIVE_STATE_UNFORMATTED {
metrics.insert(
d.endpoint.clone(),
DiskMetric {
n_disks: 1,
offline: 1,
..Default::default()
},
);
metrics.insert(d.endpoint.clone(), DiskMetric {
n_disks: 1,
offline: 1,
..Default::default()
});
continue;
}

View File

@@ -367,14 +367,11 @@ impl PoolMeta {
}
decom_started = true;
}
remembered_pools.insert(
pool.cmd_line.clone(),
PoolInfo {
position: idx,
completed: complete,
decom_started,
},
);
remembered_pools.insert(pool.cmd_line.clone(), PoolInfo {
position: idx,
completed: complete,
decom_started,
});
}
let mut specified_pools = HashMap::new();
@@ -725,20 +722,16 @@ impl ECStore {
if version.deleted {
// TODO: other params
if let Err(err) = self
.delete_object(
bucket.as_str(),
&version.name,
ObjectOptions {
versioned: true,
version_id: version_id.clone(),
mod_time: version.mod_time,
src_pool_idx: idx,
data_movement: true,
delete_marker: true,
skip_decommissioned: true,
..Default::default()
},
)
.delete_object(bucket.as_str(), &version.name, ObjectOptions {
versioned: true,
version_id: version_id.clone(),
mod_time: version.mod_time,
src_pool_idx: idx,
data_movement: true,
delete_marker: true,
skip_decommissioned: true,
..Default::default()
})
.await
{
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
@@ -851,16 +844,12 @@ impl ECStore {
if decommissioned == fivs.versions.len() {
if let Err(err) = set
.delete_object(
bucket.as_str(),
&encode_dir_object(&entry.name),
ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
.delete_object(bucket.as_str(), &encode_dir_object(&entry.name), ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
..Default::default()
},
)
..Default::default()
})
.await
{
error!("decommission_pool: delete_object err {:?}", &err);
@@ -1198,17 +1187,13 @@ impl ECStore {
if object_info.is_multipart() {
let res = match self
.new_multipart_upload(
&bucket,
&object_info.name,
&ObjectOptions {
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
user_defined: object_info.user_defined.clone(),
src_pool_idx: pool_idx,
data_movement: true,
..Default::default()
},
)
.new_multipart_upload(&bucket, &object_info.name, &ObjectOptions {
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
user_defined: object_info.user_defined.clone(),
src_pool_idx: pool_idx,
data_movement: true,
..Default::default()
})
.await
{
Ok(res) => res,
@@ -1239,17 +1224,10 @@ impl ECStore {
let mut data = PutObjReader::from_vec(chunk);
let pi = match self
.put_object_part(
&bucket,
&object_info.name,
&res.upload_id,
part.number,
&mut data,
&ObjectOptions {
preserve_etag: Some(part.etag.clone()),
..Default::default()
},
)
.put_object_part(&bucket, &object_info.name, &res.upload_id, part.number, &mut data, &ObjectOptions {
preserve_etag: Some(part.etag.clone()),
..Default::default()
})
.await
{
Ok(pi) => pi,
@@ -1269,17 +1247,11 @@ impl ECStore {
if let Err(err) = self
.clone()
.complete_multipart_upload(
&bucket,
&object_info.name,
&res.upload_id,
parts,
&ObjectOptions {
data_movement: true,
mod_time: object_info.mod_time,
..Default::default()
},
)
.complete_multipart_upload(&bucket, &object_info.name, &res.upload_id, parts, &ObjectOptions {
data_movement: true,
mod_time: object_info.mod_time,
..Default::default()
})
.await
{
error!("decommission_object: complete_multipart_upload err {:?}", &err);
@@ -1295,21 +1267,16 @@ impl ECStore {
let mut data = PutObjReader::new(hrd);
if let Err(err) = self
.put_object(
&bucket,
&object_info.name,
&mut data,
&ObjectOptions {
src_pool_idx: pool_idx,
data_movement: true,
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
mod_time: object_info.mod_time,
user_defined: object_info.user_defined.clone(),
preserve_etag: object_info.etag.clone(),
.put_object(&bucket, &object_info.name, &mut data, &ObjectOptions {
src_pool_idx: pool_idx,
data_movement: true,
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
mod_time: object_info.mod_time,
user_defined: object_info.user_defined.clone(),
preserve_etag: object_info.etag.clone(),
..Default::default()
},
)
..Default::default()
})
.await
{
error!("decommission_object: put_object err {:?}", &err);
@@ -1349,34 +1316,31 @@ impl SetDisks {
let cb1 = cb_func.clone();
list_path_raw(
rx,
ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
bucket: bucket_info.name.clone(),
path: bucket_info.prefix.clone(),
recursice: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
let resolver = resolver.clone();
let cb_func = cb_func.clone();
match entries.resolve(resolver) {
Some(entry) => {
warn!("decommission_pool: list_objects_to_decommission get {}", &entry.name);
Box::pin(async move {
cb_func(entry).await;
})
}
None => {
warn!("decommission_pool: list_objects_to_decommission get none");
Box::pin(async {})
}
list_path_raw(rx, ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
bucket: bucket_info.name.clone(),
path: bucket_info.prefix.clone(),
recursice: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
let resolver = resolver.clone();
let cb_func = cb_func.clone();
match entries.resolve(resolver) {
Some(entry) => {
warn!("decommission_pool: list_objects_to_decommission get {}", &entry.name);
Box::pin(async move {
cb_func(entry).await;
})
}
})),
..Default::default()
},
)
None => {
warn!("decommission_pool: list_objects_to_decommission get none");
Box::pin(async {})
}
}
})),
..Default::default()
})
.await?;
Ok(())

View File

@@ -774,20 +774,16 @@ impl ECStore {
let mut error = None;
if version.deleted {
if let Err(err) = set
.delete_object(
&bucket,
&version.name,
ObjectOptions {
versioned: true,
version_id: version_id.clone(),
mod_time: version.mod_time,
src_pool_idx: pool_index,
data_movement: true,
delete_marker: true,
skip_decommissioned: true,
..Default::default()
},
)
.delete_object(&bucket, &version.name, ObjectOptions {
versioned: true,
version_id: version_id.clone(),
mod_time: version.mod_time,
src_pool_idx: pool_index,
data_movement: true,
delete_marker: true,
skip_decommissioned: true,
..Default::default()
})
.await
{
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
@@ -883,16 +879,12 @@ impl ECStore {
if rebalanced == fivs.versions.len() {
if let Err(err) = set
.delete_object(
bucket.as_str(),
&encode_dir_object(&entry.name),
ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
.delete_object(bucket.as_str(), &encode_dir_object(&entry.name), ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
..Default::default()
},
)
..Default::default()
})
.await
{
error!("rebalance_entry: delete_object err {:?}", &err);
@@ -911,17 +903,13 @@ impl ECStore {
if object_info.is_multipart() {
let res = match self
.new_multipart_upload(
&bucket,
&object_info.name,
&ObjectOptions {
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
user_defined: object_info.user_defined.clone(),
src_pool_idx: pool_idx,
data_movement: true,
..Default::default()
},
)
.new_multipart_upload(&bucket, &object_info.name, &ObjectOptions {
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
user_defined: object_info.user_defined.clone(),
src_pool_idx: pool_idx,
data_movement: true,
..Default::default()
})
.await
{
Ok(res) => res,
@@ -955,17 +943,10 @@ impl ECStore {
let mut data = PutObjReader::from_vec(chunk);
let pi = match self
.put_object_part(
&bucket,
&object_info.name,
&res.upload_id,
part.number,
&mut data,
&ObjectOptions {
preserve_etag: Some(part.etag.clone()),
..Default::default()
},
)
.put_object_part(&bucket, &object_info.name, &res.upload_id, part.number, &mut data, &ObjectOptions {
preserve_etag: Some(part.etag.clone()),
..Default::default()
})
.await
{
Ok(pi) => pi,
@@ -983,17 +964,11 @@ impl ECStore {
if let Err(err) = self
.clone()
.complete_multipart_upload(
&bucket,
&object_info.name,
&res.upload_id,
parts,
&ObjectOptions {
data_movement: true,
mod_time: object_info.mod_time,
..Default::default()
},
)
.complete_multipart_upload(&bucket, &object_info.name, &res.upload_id, parts, &ObjectOptions {
data_movement: true,
mod_time: object_info.mod_time,
..Default::default()
})
.await
{
error!("rebalance_object: complete_multipart_upload err {:?}", &err);
@@ -1008,21 +983,16 @@ impl ECStore {
let mut data = PutObjReader::new(hrd);
if let Err(err) = self
.put_object(
&bucket,
&object_info.name,
&mut data,
&ObjectOptions {
src_pool_idx: pool_idx,
data_movement: true,
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
mod_time: object_info.mod_time,
user_defined: object_info.user_defined.clone(),
preserve_etag: object_info.etag.clone(),
.put_object(&bucket, &object_info.name, &mut data, &ObjectOptions {
src_pool_idx: pool_idx,
data_movement: true,
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
mod_time: object_info.mod_time,
user_defined: object_info.user_defined.clone(),
preserve_etag: object_info.etag.clone(),
..Default::default()
},
)
..Default::default()
})
.await
{
error!("rebalance_object: put_object err {:?}", &err);
@@ -1167,36 +1137,33 @@ impl SetDisks {
};
let cb1 = cb.clone();
list_path_raw(
rx,
ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
bucket: bucket.clone(),
recursice: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
info!("list_objects_to_rebalance: agreed: {:?}", &entry.name);
Box::pin(cb1(entry))
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
// let cb = cb.clone();
let resolver = resolver.clone();
let cb = cb.clone();
list_path_raw(rx, ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
bucket: bucket.clone(),
recursice: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
info!("list_objects_to_rebalance: agreed: {:?}", &entry.name);
Box::pin(cb1(entry))
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
// let cb = cb.clone();
let resolver = resolver.clone();
let cb = cb.clone();
match entries.resolve(resolver) {
Some(entry) => {
info!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name);
Box::pin(async move { cb(entry).await })
}
None => {
info!("list_objects_to_rebalance: list_objects_to_decommission get none");
Box::pin(async {})
}
match entries.resolve(resolver) {
Some(entry) => {
info!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name);
Box::pin(async move { cb(entry).await })
}
})),
..Default::default()
},
)
None => {
info!("list_objects_to_rebalance: list_objects_to_decommission get none");
Box::pin(async {})
}
}
})),
..Default::default()
})
.await?;
info!("list_objects_to_rebalance: list_objects_to_rebalance done");

View File

@@ -262,13 +262,10 @@ impl Node for NodeService {
let request = request.into_inner();
match self
.local_peer
.delete_bucket(
&request.bucket,
&DeleteBucketOptions {
force: false,
..Default::default()
},
)
.delete_bucket(&request.bucket, &DeleteBucketOptions {
force: false,
..Default::default()
})
.await
{
Ok(_) => Ok(tonic::Response::new(DeleteBucketResponse {

View File

@@ -406,17 +406,11 @@ impl SetDisks {
let src_object = src_object.clone();
futures.push(tokio::spawn(async move {
let _ = disk
.delete_version(
&src_bucket,
&src_object,
fi,
false,
DeleteOptions {
undo_write: true,
old_data_dir,
..Default::default()
},
)
.delete_version(&src_bucket, &src_object, fi, false, DeleteOptions {
undo_write: true,
old_data_dir,
..Default::default()
})
.await
.map_err(|e| {
debug!("rename_data delete_version err {:?}", e);
@@ -489,14 +483,10 @@ impl SetDisks {
tokio::spawn(async move {
if let Some(disk) = disk {
(disk
.delete(
&bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.delete(&bucket, &file_path, DeleteOptions {
recursive: true,
..Default::default()
})
.await)
.err()
} else {
@@ -695,14 +685,10 @@ impl SetDisks {
if let Some(disk) = disks[i].as_ref() {
let _ = disk
.delete(
bucket,
&path_join_buf(&[prefix, STORAGE_FORMAT_FILE]),
DeleteOptions {
recursive: true,
..Default::default()
},
)
.delete(bucket, &path_join_buf(&[prefix, STORAGE_FORMAT_FILE]), DeleteOptions {
recursive: true,
..Default::default()
})
.await
.map_err(|e| {
warn!("write meta revert err {:?}", e);
@@ -1681,14 +1667,10 @@ impl SetDisks {
for disk in disks.iter() {
futures.push(async move {
if let Some(disk) = disk {
disk.delete(
bucket,
prefix,
DeleteOptions {
recursive: true,
..Default::default()
},
)
disk.delete(bucket, prefix, DeleteOptions {
recursive: true,
..Default::default()
})
.await
} else {
Err(DiskError::DiskNotFound)
@@ -2409,17 +2391,10 @@ impl SetDisks {
// Allow for dangling deletes, on versions that have DataDir missing etc.
// this would end up restoring the correct readable versions.
return match self
.delete_if_dang_ling(
bucket,
object,
&parts_metadata,
&errs,
&data_errs_by_part,
ObjectOptions {
version_id: version_id_op.clone(),
..Default::default()
},
)
.delete_if_dang_ling(bucket, object, &parts_metadata, &errs, &data_errs_by_part, ObjectOptions {
version_id: version_id_op.clone(),
..Default::default()
})
.await
{
Ok(m) => {
@@ -2725,15 +2700,11 @@ impl SetDisks {
if parts_metadata[index].is_remote() {
let rm_data_dir = parts_metadata[index].data_dir.unwrap().to_string();
let d_path = Path::new(&encode_dir_object(object)).join(rm_data_dir);
disk.delete(
bucket,
d_path.to_str().unwrap(),
DeleteOptions {
immediate: true,
recursive: true,
..Default::default()
},
)
disk.delete(bucket, d_path.to_str().unwrap(), DeleteOptions {
immediate: true,
recursive: true,
..Default::default()
})
.await?;
}
@@ -2756,17 +2727,10 @@ impl SetDisks {
Err(err) => {
let data_errs_by_part = HashMap::new();
match self
.delete_if_dang_ling(
bucket,
object,
&parts_metadata,
&errs,
&data_errs_by_part,
ObjectOptions {
version_id: version_id_op.clone(),
..Default::default()
},
)
.delete_if_dang_ling(bucket, object, &parts_metadata, &errs, &data_errs_by_part, ObjectOptions {
version_id: version_id_op.clone(),
..Default::default()
})
.await
{
Ok(m) => {
@@ -2822,15 +2786,11 @@ impl SetDisks {
let object = object.to_string();
futures.push(tokio::spawn(async move {
let _ = disk
.delete(
&bucket,
&object,
DeleteOptions {
recursive: false,
immediate: false,
..Default::default()
},
)
.delete(&bucket, &object, DeleteOptions {
recursive: false,
immediate: false,
..Default::default()
})
.await;
}));
}
@@ -3525,16 +3485,11 @@ impl SetDisks {
Ok(fivs) => fivs,
Err(err) => {
match self_clone
.heal_object(
&bucket,
&encoded_entry_name,
"",
&HealOpts {
scan_mode,
remove: HEAL_DELETE_DANGLING,
..Default::default()
},
)
.heal_object(&bucket, &encoded_entry_name, "", &HealOpts {
scan_mode,
remove: HEAL_DELETE_DANGLING,
..Default::default()
})
.await
{
Ok((res, None)) => {
@@ -3651,56 +3606,53 @@ impl SetDisks {
let bucket_partial = bucket.clone();
let heal_entry_agree = heal_entry.clone();
let heal_entry_partial = heal_entry.clone();
if let Err(err) = list_path_raw(
rx,
ListPathRawOptions {
disks,
fallback_disks,
bucket: bucket.clone(),
recursice: true,
forward_to,
min_disks: 1,
report_not_found: false,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
let jt = jt_agree.clone();
let bucket = bucket_agree.clone();
let heal_entry = heal_entry_agree.clone();
Box::pin(async move {
if let Err(err) = list_path_raw(rx, ListPathRawOptions {
disks,
fallback_disks,
bucket: bucket.clone(),
recursice: true,
forward_to,
min_disks: 1,
report_not_found: false,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
let jt = jt_agree.clone();
let bucket = bucket_agree.clone();
let heal_entry = heal_entry_agree.clone();
Box::pin(async move {
jt.take().await;
let bucket = bucket.clone();
tokio::spawn(async move {
heal_entry(bucket, entry).await;
});
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
let jt = jt_partial.clone();
let bucket = bucket_partial.clone();
let heal_entry = heal_entry_partial.clone();
Box::pin({
let heal_entry = heal_entry.clone();
let resolver = resolver.clone();
async move {
let entry = if let Some(entry) = entries.resolve(resolver) {
entry
} else if let (Some(entry), _) = entries.first_found() {
entry
} else {
return;
};
jt.take().await;
let bucket = bucket.clone();
let heal_entry = heal_entry.clone();
tokio::spawn(async move {
heal_entry(bucket, entry).await;
});
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
let jt = jt_partial.clone();
let bucket = bucket_partial.clone();
let heal_entry = heal_entry_partial.clone();
Box::pin({
let heal_entry = heal_entry.clone();
let resolver = resolver.clone();
async move {
let entry = if let Some(entry) = entries.resolve(resolver) {
entry
} else if let (Some(entry), _) = entries.first_found() {
entry
} else {
return;
};
jt.take().await;
let bucket = bucket.clone();
let heal_entry = heal_entry.clone();
tokio::spawn(async move {
heal_entry(bucket, entry).await;
});
}
})
})),
finished: None,
..Default::default()
},
)
}
})
})),
finished: None,
..Default::default()
})
.await
{
ret_err = Some(err.into());
@@ -3743,15 +3695,11 @@ impl SetDisks {
let prefix = prefix.to_string();
futures.push(async move {
if let Some(disk) = disk_op {
disk.delete(
&bucket,
&prefix,
DeleteOptions {
recursive: true,
immediate: true,
..Default::default()
},
)
disk.delete(&bucket, &prefix, DeleteOptions {
recursive: true,
immediate: true,
..Default::default()
})
.await
} else {
Ok(())

View File

@@ -557,14 +557,11 @@ impl StorageAPI for Sets {
let idx = self.get_hashed_set_index(obj.object_name.as_str());
if !set_obj_map.contains_key(&idx) {
set_obj_map.insert(
idx,
vec![DelObj {
// set_idx: idx,
orig_idx: i,
obj: obj.clone(),
}],
);
set_obj_map.insert(idx, vec![DelObj {
// set_idx: idx,
orig_idx: i,
obj: obj.clone(),
}]);
} else if let Some(val) = set_obj_map.get_mut(&idx) {
val.push(DelObj {
// set_idx: idx,
@@ -756,13 +753,10 @@ impl StorageAPI for Sets {
#[tracing::instrument(skip(self))]
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)> {
let (disks, _) = init_storage_disks_with_errors(
&self.endpoints.endpoints,
&DiskOption {
cleanup: false,
health_check: false,
},
)
let (disks, _) = init_storage_disks_with_errors(&self.endpoints.endpoints, &DiskOption {
cleanup: false,
health_check: false,
})
.await;
let (formats, errs) = load_format_erasure_all(&disks, true).await;
if let Err(err) = check_format_erasure_values(&formats, self.set_drive_count) {

View File

@@ -154,13 +154,10 @@ impl ECStore {
// validate_parity(partiy_count, pool_eps.drives_per_set)?;
let (disks, errs) = store_init::init_disks(
&pool_eps.endpoints,
&DiskOption {
cleanup: true,
health_check: true,
},
)
let (disks, errs) = store_init::init_disks(&pool_eps.endpoints, &DiskOption {
cleanup: true,
health_check: true,
})
.await;
check_disk_fatal_errs(&errs)?;
@@ -504,14 +501,10 @@ impl ECStore {
}
async fn delete_prefix(&self, bucket: &str, object: &str) -> Result<()> {
for pool in self.pools.iter() {
pool.delete_object(
bucket,
object,
ObjectOptions {
delete_prefix: true,
..Default::default()
},
)
pool.delete_object(bucket, object, ObjectOptions {
delete_prefix: true,
..Default::default()
})
.await?;
}
@@ -621,15 +614,11 @@ impl ECStore {
async fn get_pool_idx(&self, bucket: &str, object: &str, size: i64) -> Result<usize> {
let idx = match self
.get_pool_idx_existing_with_opts(
bucket,
object,
&ObjectOptions {
skip_decommissioned: true,
skip_rebalancing: true,
..Default::default()
},
)
.get_pool_idx_existing_with_opts(bucket, object, &ObjectOptions {
skip_decommissioned: true,
skip_rebalancing: true,
..Default::default()
})
.await
{
Ok(res) => res,
@@ -670,16 +659,12 @@ impl ECStore {
}
async fn get_pool_idx_existing_no_lock(&self, bucket: &str, object: &str) -> Result<usize> {
self.get_pool_idx_existing_with_opts(
bucket,
object,
&ObjectOptions {
no_lock: true,
skip_decommissioned: true,
skip_rebalancing: true,
..Default::default()
},
)
self.get_pool_idx_existing_with_opts(bucket, object, &ObjectOptions {
no_lock: true,
skip_decommissioned: true,
skip_rebalancing: true,
..Default::default()
})
.await
}
@@ -1374,14 +1359,11 @@ impl StorageAPI for ECStore {
if let Err(err) = self.peer_sys.make_bucket(bucket, opts).await {
if !is_err_bucket_exists(&err.into()) {
let _ = self
.delete_bucket(
bucket,
&DeleteBucketOptions {
no_lock: true,
no_recreate: true,
..Default::default()
},
)
.delete_bucket(bucket, &DeleteBucketOptions {
no_lock: true,
no_recreate: true,
..Default::default()
})
.await;
}
};
@@ -1684,14 +1666,10 @@ impl StorageAPI for ECStore {
for obj in objects.iter() {
futures.push(async move {
self.internal_get_pool_info_existing_with_opts(
bucket,
&obj.object_name,
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
self.internal_get_pool_info_existing_with_opts(bucket, &obj.object_name, &ObjectOptions {
no_lock: true,
..Default::default()
})
.await
});
}

View File

@@ -263,14 +263,10 @@ impl ECStore {
// use get
if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() {
match self
.get_object_info(
&opts.bucket,
&opts.prefix,
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.get_object_info(&opts.bucket, &opts.prefix, &ObjectOptions {
no_lock: true,
..Default::default()
})
.await
{
Ok(res) => {
@@ -769,48 +765,45 @@ impl ECStore {
let tx1 = sender.clone();
let tx2 = sender.clone();
list_path_raw(
rx.resubscribe(),
ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: bucket.to_owned(),
path,
recursice: true,
filter_prefix: Some(filter_prefix),
forward_to: opts.marker.clone(),
min_disks: listing_quorum,
per_disk_limit: opts.limit as i32,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
Box::pin({
let value = tx1.clone();
async move {
if entry.is_dir() {
return;
}
list_path_raw(rx.resubscribe(), ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: bucket.to_owned(),
path,
recursice: true,
filter_prefix: Some(filter_prefix),
forward_to: opts.marker.clone(),
min_disks: listing_quorum,
per_disk_limit: opts.limit as i32,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
Box::pin({
let value = tx1.clone();
async move {
if entry.is_dir() {
return;
}
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
Box::pin({
let value = tx2.clone();
let resolver = resolver.clone();
async move {
if let Some(entry) = entries.resolve(resolver) {
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
Box::pin({
let value = tx2.clone();
let resolver = resolver.clone();
async move {
if let Some(entry) = entries.resolve(resolver) {
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
}
})
})),
finished: None,
..Default::default()
},
)
}
})
})),
finished: None,
..Default::default()
})
.await
});
}
@@ -1279,45 +1272,42 @@ impl SetDisks {
let tx1 = sender.clone();
let tx2 = sender.clone();
list_path_raw(
rx,
ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: opts.bucket,
path: opts.base_dir,
recursice: opts.recursive,
filter_prefix: opts.filter_prefix,
forward_to: opts.marker,
min_disks: listing_quorum,
per_disk_limit: limit,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
Box::pin({
let value = tx1.clone();
async move {
list_path_raw(rx, ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: opts.bucket,
path: opts.base_dir,
recursice: opts.recursive,
filter_prefix: opts.filter_prefix,
forward_to: opts.marker,
min_disks: listing_quorum,
per_disk_limit: limit,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
Box::pin({
let value = tx1.clone();
async move {
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
Box::pin({
let value = tx2.clone();
let resolver = resolver.clone();
async move {
if let Some(entry) = entries.resolve(resolver) {
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
Box::pin({
let value = tx2.clone();
let resolver = resolver.clone();
async move {
if let Some(entry) = entries.resolve(resolver) {
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
}
})
})),
finished: None,
..Default::default()
},
)
}
})
})),
finished: None,
..Default::default()
})
.await
.map_err(Error::other)
}

View File

@@ -365,15 +365,10 @@ impl TierConfigMgr {
file: &str,
data: Bytes,
) -> std::result::Result<(), std::io::Error> {
self.save_config_with_opts(
api,
file,
data,
&ObjectOptions {
max_parity: true,
..Default::default()
},
)
self.save_config_with_opts(api, file, data, &ObjectOptions {
max_parity: true,
..Default::default()
})
.await
}

View File

@@ -101,19 +101,13 @@ impl WarmBackend for WarmBackendMinIO {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -98,19 +98,13 @@ impl WarmBackend for WarmBackendRustFS {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -127,18 +127,12 @@ impl WarmBackend for WarmBackendS3 {
) -> Result<String, std::io::Error> {
let client = self.client.clone();
let res = client
.put_object(
&self.bucket,
&self.get_dest(object),
r,
length,
&PutObjectOptions {
send_content_md5: true,
storage_class: self.storage_class.clone(),
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.bucket, &self.get_dest(object), r, length, &PutObjectOptions {
send_content_md5: true,
storage_class: self.storage_class.clone(),
user_metadata: meta,
..Default::default()
})
.await?;
Ok(res.version_id)
}

View File

@@ -1575,14 +1575,11 @@ pub fn get_default_policyes() -> HashMap<String, PolicyDoc> {
default_policies
.iter()
.map(|(n, p)| {
(
n.to_string(),
PolicyDoc {
version: 1,
policy: p.clone(),
..Default::default()
},
)
(n.to_string(), PolicyDoc {
version: 1,
policy: p.clone(),
..Default::default()
})
})
.collect()
}

View File

@@ -140,20 +140,17 @@ impl Locker for LocalLocker {
}
args.resources.iter().enumerate().for_each(|(idx, resource)| {
self.lock_map.insert(
resource.to_string(),
vec![LockRequesterInfo {
name: resource.to_string(),
writer: true,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
group: args.resources.len() > 1,
quorum: args.quorum,
idx,
..Default::default()
}],
);
self.lock_map.insert(resource.to_string(), vec![LockRequesterInfo {
name: resource.to_string(),
writer: true,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
group: args.resources.len() > 1,
quorum: args.quorum,
idx,
..Default::default()
}]);
let mut uuid = args.uid.to_string();
format_uuid(&mut uuid, &idx);
@@ -230,18 +227,15 @@ impl Locker for LocalLocker {
}
}
None => {
self.lock_map.insert(
resource.to_string(),
vec![LockRequesterInfo {
name: resource.to_string(),
writer: false,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
quorum: args.quorum,
..Default::default()
}],
);
self.lock_map.insert(resource.to_string(), vec![LockRequesterInfo {
name: resource.to_string(),
writer: false,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
quorum: args.quorum,
..Default::default()
}]);
}
}
let mut uuid = args.uid.to_string();

View File

@@ -46,10 +46,10 @@ async fn run(bucket: String, object: String, user: String, service_name: String)
// Record Metrics
let meter = global::meter("rustfs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "run")],
);
request_duration.record(start_time.elapsed().unwrap().as_secs_f64(), &[opentelemetry::KeyValue::new(
"operation",
"run",
)]);
match SystemObserver::init_process_observer(meter).await {
Ok(_) => info!("Process observer initialized successfully"),
@@ -84,10 +84,10 @@ async fn put_object(bucket: String, object: String, user: String) {
let meter = global::meter("rustfs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "put_object")],
);
request_duration.record(start_time.elapsed().unwrap().as_secs_f64(), &[opentelemetry::KeyValue::new(
"operation",
"put_object",
)]);
info!(
"Starting PUT operation content: bucket = {}, object = {}, user = {},start_time = {}",

View File

@@ -115,24 +115,18 @@ impl Collector {
let transmitted = data.transmitted() as i64;
self.metrics.network_io_per_interface.record(
received,
&[
&self.attributes.attributes[..],
&[
KeyValue::new(INTERFACE, interface_name.to_string()),
KeyValue::new(DIRECTION, "received"),
],
]
&[&self.attributes.attributes[..], &[
KeyValue::new(INTERFACE, interface_name.to_string()),
KeyValue::new(DIRECTION, "received"),
]]
.concat(),
);
self.metrics.network_io_per_interface.record(
transmitted,
&[
&self.attributes.attributes[..],
&[
KeyValue::new(INTERFACE, interface_name.to_string()),
KeyValue::new(DIRECTION, "transmitted"),
],
]
&[&self.attributes.attributes[..], &[
KeyValue::new(INTERFACE, interface_name.to_string()),
KeyValue::new(DIRECTION, "transmitted"),
]]
.concat(),
);
}
@@ -155,10 +149,10 @@ impl Collector {
};
self.metrics.process_status.record(
status_value,
&[
&self.attributes.attributes[..],
&[KeyValue::new(STATUS, format!("{:?}", process.status()))],
]
&[&self.attributes.attributes[..], &[KeyValue::new(
STATUS,
format!("{:?}", process.status()),
)]]
.concat(),
);

View File

@@ -276,12 +276,150 @@ pub mod default {
#[allow(clippy::incompatible_msrv)]
pub static DEFAULT_POLICIES: LazyLock<[(&'static str, Policy); 6]> = LazyLock::new(|| {
[
(
"readwrite",
Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
("readwrite", Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::AllActions));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
}),
("readonly", Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::GetBucketLocationAction));
hash_set.insert(Action::S3Action(S3Action::GetObjectAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
}),
("writeonly", Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::PutObjectAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
}),
("writeonly", Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::PutObjectAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
}),
("diagnostics", Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::AdminAction(AdminAction::ProfilingAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::TraceAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::ConsoleLogAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::ServerInfoAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::TopLocksAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::HealthInfoAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::PrometheusAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::BandwidthMonitorAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
}),
("consoleAdmin", Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![
Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::AdminAction(AdminAction::AllAdminActions));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet(HashSet::new()),
conditions: Functions::default(),
..Default::default()
},
Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::KmsAction(KmsAction::AllActions));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet(HashSet::new()),
conditions: Functions::default(),
..Default::default()
},
Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
@@ -297,165 +435,9 @@ pub mod default {
}),
conditions: Functions::default(),
..Default::default()
}],
},
),
(
"readonly",
Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::GetBucketLocationAction));
hash_set.insert(Action::S3Action(S3Action::GetObjectAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
},
),
(
"writeonly",
Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::PutObjectAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
},
),
(
"writeonly",
Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::PutObjectAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
},
),
(
"diagnostics",
Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::AdminAction(AdminAction::ProfilingAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::TraceAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::ConsoleLogAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::ServerInfoAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::TopLocksAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::HealthInfoAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::PrometheusAdminAction));
hash_set.insert(Action::AdminAction(AdminAction::BandwidthMonitorAction));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
}],
},
),
(
"consoleAdmin",
Policy {
id: "".into(),
version: DEFAULT_VERSION.into(),
statements: vec![
Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::AdminAction(AdminAction::AllAdminActions));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet(HashSet::new()),
conditions: Functions::default(),
..Default::default()
},
Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::KmsAction(KmsAction::AllActions));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet(HashSet::new()),
conditions: Functions::default(),
..Default::default()
},
Statement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::S3Action(S3Action::AllActions));
hash_set
}),
not_actions: ActionSet(Default::default()),
resources: ResourceSet({
let mut hash_set = HashSet::new();
hash_set.insert(Resource::S3("*".into()));
hash_set
}),
conditions: Functions::default(),
..Default::default()
},
],
},
),
},
],
}),
]
});
}

View File

@@ -85,13 +85,9 @@ fn main() -> Result<(), AnyError> {
Err(_) => "flatc".to_string(),
};
compile_flatbuffers_models(
&mut generated_mod_rs,
&flatc_path,
proto_dir.clone(),
flatbuffer_out_dir.clone(),
vec!["models"],
)?;
compile_flatbuffers_models(&mut generated_mod_rs, &flatc_path, proto_dir.clone(), flatbuffer_out_dir.clone(), vec![
"models",
])?;
fmt();
Ok(())

View File

@@ -17,9 +17,9 @@ use lazy_static::lazy_static;
use std::collections::HashMap;
use time::{OffsetDateTime, macros::format_description};
use s3s::Body;
use super::request_signature_v4::{SERVICE_TYPE_S3, get_scope, get_signature, get_signing_key};
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
const STREAMING_SIGN_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
const STREAMING_SIGN_TRAILER_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";

View File

@@ -19,15 +19,14 @@ use std::collections::HashMap;
use std::fmt::Write;
use time::{OffsetDateTime, format_description};
use s3s::Body;
use super::utils::get_host_addr;
use rustfs_utils::crypto::{base64_encode, hex, hmac_sha1};
use s3s::Body;
const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
const SIGN_V2_ALGORITHM: &str = "AWS";
fn encode_url2path(req: &request::Request<Body>, _virtual_host: bool) -> String {
req.uri().path().to_string()
}

View File

@@ -22,11 +22,11 @@ use std::fmt::Write;
use time::{OffsetDateTime, macros::format_description};
use tracing::debug;
use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256};
use s3s::Body;
use super::constants::UNSIGNED_PAYLOAD;
use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4;
use super::utils::{get_host_addr, sign_v4_trim_all};
use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256};
use s3s::Body;
pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
pub const SERVICE_TYPE_S3: &str = "s3";
@@ -270,7 +270,6 @@ pub fn pre_sign_v4(
.unwrap(),
);
*req.uri_mut() = Uri::from_parts(parts).unwrap();
req
@@ -282,10 +281,16 @@ fn _post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_ac
get_signature(signing_key, policy_base64)
}
fn _sign_v4_sts(req: request::Request<Body>, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Request<Body> {
fn _sign_v4_sts(
req: request::Request<Body>,
access_key_id: &str,
secret_access_key: &str,
location: &str,
) -> request::Request<Body> {
sign_v4_inner(req, 0, access_key_id, secret_access_key, "", location, SERVICE_TYPE_STS, HeaderMap::new())
}
#[allow(clippy::too_many_arguments)]
fn sign_v4_inner(
mut req: request::Request<Body>,
content_len: i64,

View File

@@ -1016,7 +1016,6 @@ impl Operation for RemoveRemoteTargetHandler {
)));
};
let mut need_delete = true;
if let Some(arnstr) = querys.get("arn") {
@@ -1047,7 +1046,6 @@ impl Operation for RemoveRemoteTargetHandler {
error!("need delete target is {}", decoded_str);
bucket_targets::remove_bucket_target(bucket, arnstr).await;
}
}
// List bucket targets and return as JSON to client
// match bucket_targets::list_bucket_targets(bucket).await {

View File

@@ -453,13 +453,10 @@ impl Operation for ImportBucketMetadata {
// create bucket if not exists
if !bucket_metadatas.contains_key(bucket_name) {
if let Err(e) = store
.make_bucket(
bucket_name,
&MakeBucketOptions {
force_create: true,
..Default::default()
},
)
.make_bucket(bucket_name, &MakeBucketOptions {
force_create: true,
..Default::default()
})
.await
{
warn!("create bucket failed: {e}");

View File

@@ -445,20 +445,17 @@ impl Operation for ExportIam {
let users: HashMap<String, AddOrUpdateUserReq> = users
.into_iter()
.map(|(k, v)| {
(
k,
AddOrUpdateUserReq {
secret_key: v.credentials.secret_key,
status: {
if v.credentials.status == "off" {
AccountStatus::Disabled
} else {
AccountStatus::Enabled
}
},
policy: None,
(k, AddOrUpdateUserReq {
secret_key: v.credentials.secret_key,
status: {
if v.credentials.status == "off" {
AccountStatus::Disabled
} else {
AccountStatus::Enabled
}
},
)
policy: None,
})
})
.collect::<HashMap<String, AddOrUpdateUserReq>>();

View File

@@ -310,14 +310,11 @@ impl S3 for FS {
};
store
.make_bucket(
&bucket,
&MakeBucketOptions {
force_create: true,
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
..Default::default()
},
)
.make_bucket(&bucket, &MakeBucketOptions {
force_create: true,
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
..Default::default()
})
.await
.map_err(ApiError::from)?;
@@ -498,13 +495,10 @@ impl S3 for FS {
};
store
.delete_bucket(
&input.bucket,
&DeleteBucketOptions {
force: false,
..Default::default()
},
)
.delete_bucket(&input.bucket, &DeleteBucketOptions {
force: false,
..Default::default()
})
.await
.map_err(ApiError::from)?;
@@ -1976,7 +1970,7 @@ impl S3 for FS {
..
} = req.input;
let mut lr_retention = false;
let lr_retention = false;
/*let rcfg = metadata_sys::get_object_lock_config(&bucket).await;
if let Ok(rcfg) = rcfg {
if let Some(rule) = rcfg.0.rule {