* fix error

* fix

* fix cargo

* fix openssl cfg

* fix clippy

* fix clippy

* fix

* fix clippy

* fix clippy
This commit is contained in:
likewu
2025-06-24 10:27:41 +08:00
committed by GitHub
parent e8a5b5d12c
commit 364c3098b2
56 changed files with 460 additions and 432 deletions

View File

@@ -228,11 +228,7 @@ impl FileInfo {
}
pub fn get_etag(&self) -> Option<String> {
if let Some(meta) = self.metadata.get("etag") {
Some(meta.clone())
} else {
None
}
self.metadata.get("etag").cloned()
}
pub fn write_quorum(&self, quorum: usize) -> usize {

View File

@@ -2009,7 +2009,7 @@ impl MetaObject {
panic!(
"Invalid Tier Object delete marker versionId {} {}",
fi.tier_free_version_id(),
err.to_string()
err
);
}
let vid = vid.unwrap();
@@ -2038,7 +2038,7 @@ impl MetaObject {
let aa = [tier_key, tier_obj_key, tier_obj_vid_key];
for (k, v) in &self.meta_sys {
if aa.contains(&k) {
if aa.contains(k) {
free_entry
.delete_marker
.as_mut()

View File

@@ -115,7 +115,7 @@ pub fn must_get_local_ips() -> std::io::Result<Vec<IpAddr>> {
}
}
pub fn get_default_location(u: Url, region_override: &str) -> String {
pub fn get_default_location(_u: Url, _region_override: &str) -> String {
todo!();
}
@@ -136,7 +136,7 @@ pub fn get_endpoint_url(endpoint: &str, secure: bool) -> Result<Url, std::io::Er
pub const DEFAULT_DIAL_TIMEOUT: i64 = 5;
pub fn new_remotetarget_http_transport(insecure: bool) -> Builder<TokioExecutor> {
pub fn new_remotetarget_http_transport(_insecure: bool) -> Builder<TokioExecutor> {
todo!();
}
@@ -255,11 +255,8 @@ impl TryFrom<String> for XHost {
}
}
/// parses the address string, process the ":port" format for double-stack binding,
/// and resolve the host name or IP address. If the port is 0, an available port is assigned.
pub fn parse_and_resolve_address(addr_str: &str) -> std::io::Result<SocketAddr> {
let resolved_addr: SocketAddr = if let Some(port) = addr_str.strip_prefix(":") {
// Process the ":port" format for double stack binding
let port_str = port;
let port: u16 = port_str
.parse()
@@ -269,10 +266,8 @@ pub fn parse_and_resolve_address(addr_str: &str) -> std::io::Result<SocketAddr>
} else {
port
};
// Using IPv6 without address specified [::], it should handle both IPv4 and IPv6
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), final_port)
} else {
// Use existing logic to handle regular address formats
let mut addr = check_local_server_addr(addr_str)?; // assume check_local_server_addr is available here
if addr.port() == 0 {
addr.set_port(get_available_port());

View File

@@ -1,7 +1,7 @@
#![allow(unsafe_code)] // TODO: audit unsafe code
use super::{DiskInfo, IOStats};
use std::io::{Error, ErrorKind};
use std::io::Error;
use std::mem;
use std::os::windows::ffi::OsStrExt;
use std::path::Path;
@@ -40,8 +40,7 @@ pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
let free = unsafe { *lp_total_number_of_free_bytes.QuadPart() };
if free > total {
return Err(Error::new(
ErrorKind::Other,
return Err(Error::other(
format!(
"detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'",
free,

View File

@@ -1,22 +1,9 @@
use std::time::Duration;
// MaxRetry is the maximum number of retries before stopping.
pub const MAX_RETRY: i64 = 10;
// MaxJitter will randomize over the full exponential backoff time
pub const MAX_JITTER: f64 = 1.0;
// NoJitter disables the use of jitter for randomizing the exponential backoff time
pub const NO_JITTER: f64 = 0.0;
// DefaultRetryUnit - default unit multiplicative per retry.
// defaults to 200 * time.Millisecond
//const DefaultRetryUnit = 200 * time.Millisecond;
// DefaultRetryCap - Each retry attempt never waits no longer than
// this maximum time duration.
//const DefaultRetryCap = time.Second;
/*
struct Delay {
when: Instant,
@@ -76,100 +63,6 @@ impl Stream for RetryTimer {
}
}*/
pub fn new_retry_timer(max_retry: i32, base_sleep: Duration, max_sleep: Duration, jitter: f64) -> Vec<i32> {
/*attemptCh := make(chan int)
exponentialBackoffWait := func(attempt int) time.Duration {
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
// sleep = random_between(0, min(maxSleep, base * 2 ** attempt))
sleep := baseSleep * time.Duration(1<<uint(attempt))
if sleep > maxSleep {
sleep = maxSleep
}
if jitter != NoJitter {
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
defer close(attemptCh)
for i := 0; i < maxRetry; i++ {
select {
case attemptCh <- i + 1:
case <-ctx.Done():
return
}
select {
case <-time.After(exponentialBackoffWait(i)):
case <-ctx.Done():
return
}
}
}()
return attemptCh*/
pub fn new_retry_timer(_max_retry: i32, _base_sleep: Duration, _max_sleep: Duration, _jitter: f64) -> Vec<i32> {
todo!();
}
/*var retryableS3Codes = map[string]struct{}{
"RequestError": {},
"RequestTimeout": {},
"Throttling": {},
"ThrottlingException": {},
"RequestLimitExceeded": {},
"RequestThrottled": {},
"InternalError": {},
"ExpiredToken": {},
"ExpiredTokenException": {},
"SlowDown": {},
}
fn isS3CodeRetryable(s3Code string) (ok bool) {
_, ok = retryableS3Codes[s3Code]
return ok
}
var retryableHTTPStatusCodes = map[int]struct{}{
http.StatusRequestTimeout: {},
429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet
499: {}, // client closed request, retry. A non-standard status code introduced by nginx.
http.StatusInternalServerError: {},
http.StatusBadGateway: {},
http.StatusServiceUnavailable: {},
http.StatusGatewayTimeout: {},
520: {}, // It is used by Cloudflare as a catch-all response for when the origin server sends something unexpected.
// Add more HTTP status codes here.
}
fn isHTTPStatusRetryable(httpStatusCode int) (ok bool) {
_, ok = retryableHTTPStatusCodes[httpStatusCode]
return ok
}
fn isRequestErrorRetryable(ctx context.Context, err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Retry if internal timeout in the HTTP call.
return ctx.Err() == nil
}
if ue, ok := err.(*url.Error); ok {
e := ue.Unwrap()
switch e.(type) {
// x509: certificate signed by unknown authority
case x509.UnknownAuthorityError:
return false
}
switch e.Error() {
case "http: server gave HTTP response to HTTPS client":
return false
}
}
return true
}*/
}

View File

@@ -23,7 +23,7 @@ use protos::{
};
use std::{
collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH},
time::SystemTime,
};
use time::OffsetDateTime;
use tonic::Request;

View File

@@ -1,20 +1,27 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use async_channel::{Receiver as A_Receiver, Sender as A_Sender, bounded};
use futures::Future;
use http::HeaderMap;
use lazy_static::lazy_static;
use s3s::Body;
use sha2::{Digest, Sha256};
use std::any::{Any, TypeId};
use std::any::Any;
use std::collections::HashMap;
use std::env;
use std::io::{Cursor, Write};
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{RwLock, mpsc};
use tracing::{error, info, warn};
use tracing::{error, info};
use uuid::Uuid;
use xxhash_rust::xxh64;
@@ -48,7 +55,7 @@ pub type ExpiryOpType = Box<dyn ExpiryOp + Send + Sync + 'static>;
static XXHASH_SEED: u64 = 0;
const DISABLED: &str = "Disabled";
const _DISABLED: &str = "Disabled";
//pub const ERR_INVALID_STORAGECLASS: &str = "invalid storage class.";
pub const ERR_INVALID_STORAGECLASS: &str = "invalid tier.";
@@ -70,7 +77,7 @@ impl LifecycleSys {
Some(lc)
}
pub fn trace(oi: &ObjectInfo) -> TraceFn {
pub fn trace(_oi: &ObjectInfo) -> TraceFn {
todo!();
}
}
@@ -264,7 +271,7 @@ impl ExpiryState {
let task = NewerNoncurrentTask {
bucket: String::from(bucket),
versions: versions,
versions,
event: lc_event,
};
let wrkr = self.get_worker_ch(task.op_hash());
@@ -297,7 +304,7 @@ impl ExpiryState {
let mut state = GLOBAL_ExpiryState.write().await;
while state.tasks_tx.len() < n {
let (tx, mut rx) = mpsc::channel(10000);
let (tx, rx) = mpsc::channel(10000);
let api = api.clone();
let rx = Arc::new(tokio::sync::Mutex::new(rx));
state.tasks_tx.push(tx);
@@ -349,7 +356,7 @@ impl ExpiryState {
}
}
else if v.as_any().is::<NewerNoncurrentTask>() {
let v = v.as_any().downcast_ref::<NewerNoncurrentTask>().expect("err!");
let _v = v.as_any().downcast_ref::<NewerNoncurrentTask>().expect("err!");
//delete_object_versions(api, &v.bucket, &v.versions, v.event).await;
}
else if v.as_any().is::<Jentry>() {
@@ -357,7 +364,7 @@ impl ExpiryState {
}
else if v.as_any().is::<FreeVersionTask>() {
let v = v.as_any().downcast_ref::<FreeVersionTask>().expect("err!");
let oi = v.0.clone();
let _oi = v.0.clone();
}
else {
@@ -490,11 +497,9 @@ impl TransitionState {
GLOBAL_TransitionState.active_tasks.fetch_add(1, Ordering::SeqCst);
if let Err(err) = transition_object(api.clone(), &task.obj_info, LcAuditEvent::new(task.event.clone(), task.src.clone())).await {
if !is_err_version_not_found(&err) && !is_err_object_not_found(&err) && !is_network_or_host_down(&err.to_string(), false) {
if !err.to_string().contains("use of closed network connection") {
error!("Transition to {} failed for {}/{} version:{} with {}",
task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.expect("err"), err.to_string());
}
if !is_err_version_not_found(&err) && !is_err_object_not_found(&err) && !is_network_or_host_down(&err.to_string(), false) && !err.to_string().contains("use of closed network connection") {
error!("Transition to {} failed for {}/{} version:{} with {}",
task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.expect("err"), err.to_string());
}
} else {
let mut ts = TierStats {
@@ -655,7 +660,7 @@ pub async fn expire_transitioned_object(
api: Arc<ECStore>,
oi: &ObjectInfo,
lc_event: &lifecycle::Event,
src: &LcEventSrc,
_src: &LcEventSrc,
) -> Result<ObjectInfo, std::io::Error> {
//let traceFn = GLOBAL_LifecycleSys.trace(oi);
let mut opts = ObjectOptions {
@@ -747,7 +752,7 @@ pub async fn transition_object(api: Arc<ECStore>, oi: &ObjectInfo, lae: LcAuditE
api.transition_object(&oi.bucket, &oi.name, &opts).await
}
pub fn audit_tier_actions(api: ECStore, tier: &str, bytes: i64) -> TimeFn {
pub fn audit_tier_actions(_api: ECStore, _tier: &str, _bytes: i64) -> TimeFn {
todo!();
}
@@ -785,11 +790,11 @@ pub async fn get_transitioned_object_reader(
Ok(get_fn(reader, h))
}
pub fn post_restore_opts(r: http::Request<Body>, bucket: &str, object: &str) -> Result<ObjectOptions, std::io::Error> {
pub fn post_restore_opts(_r: http::Request<Body>, _bucket: &str, _object: &str) -> Result<ObjectOptions, std::io::Error> {
todo!();
}
pub fn put_restore_opts(bucket: &str, object: &str, rreq: &RestoreObjectRequest, oi: &ObjectInfo) -> ObjectOptions {
pub fn put_restore_opts(_bucket: &str, _object: &str, _rreq: &RestoreObjectRequest, _oi: &ObjectInfo) -> ObjectOptions {
todo!();
}

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, Transition,
@@ -191,10 +198,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
let rule_prefix = rule.prefix.as_ref().expect("err!");
if prefix.len() > 0 && rule_prefix.len() > 0 {
if !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix) {
continue;
}
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix) {
continue;
}
let rule_noncurrent_version_expiration = rule.noncurrent_version_expiration.as_ref().expect("err!");
@@ -317,7 +322,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
events.push(Event {
action: action,
action,
due: Some(now),
rule_id: "".into(),
noncurrent_days: 0,
@@ -420,25 +425,23 @@ impl Lifecycle for BucketLifecycleConfiguration {
if !obj.is_latest {
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
if storage_class.as_str() != "" {
if !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if due.is_some()
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unwrap().unix_timestamp())
{
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
.clone()
.unwrap()
.as_str()
.to_string(),
..Default::default()
});
}
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if due.is_some()
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unwrap().unix_timestamp())
{
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
.clone()
.unwrap()
.as_str()
.to_string(),
..Default::default()
});
}
}
}
@@ -449,17 +452,15 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0 {
if now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
if date0.unix_timestamp() != 0 && (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp()) {
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
} else if let Some(days) = expiration.days {
if days != 0 {

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use s3s::dto::{LifecycleRuleFilter, Transition};
const ERR_TRANSITION_INVALID_DAYS: &str = "Days must be 0 or greater when used with Transition";

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use sha2::Sha256;
use std::collections::HashMap;

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use sha2::{Digest, Sha256};
use std::any::Any;
use std::io::{Cursor, Write};

View File

@@ -1,7 +1,5 @@
use std::any::{Any, TypeId};
use std::collections::HashMap;
use time::{OffsetDateTime, format_description};
use tracing::{error, warn};
use s3s::dto::{Date, ObjectLockLegalHold, ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode};
use s3s::header::{X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE};
@@ -72,7 +70,7 @@ pub fn get_object_legalhold_meta(meta: HashMap<String, String>) -> ObjectLockLeg
}
pub fn parse_ret_mode(mode_str: &str) -> ObjectLockRetentionMode {
let mut mode;
let mode;
match mode_str.to_uppercase().as_str() {
"GOVERNANCE" => {
mode = ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE);
@@ -86,7 +84,7 @@ pub fn parse_ret_mode(mode_str: &str) -> ObjectLockRetentionMode {
}
pub fn parse_legalhold_status(hold_str: &str) -> ObjectLockLegalHoldStatus {
let mut st;
let st;
match hold_str {
"ON" => {
st = ObjectLockLegalHoldStatus::from_static(ObjectLockLegalHoldStatus::ON);

View File

@@ -1,7 +1,5 @@
use std::any::{Any, TypeId};
use std::sync::Arc;
use time::OffsetDateTime;
use tracing::{error, warn};
use s3s::dto::{DefaultRetention, ObjectLockLegalHoldStatus, ObjectLockRetentionMode};

View File

@@ -1,7 +1,13 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use lazy_static::lazy_static;
use std::ops::{BitAnd, BitOr};
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart};
use crate::{disk::DiskAPI, store_api::GetObjectReader};
@@ -206,7 +212,7 @@ impl ChecksumMode {
}
});
let c = self.base();
let mut crc_bytes = Vec::<u8>::with_capacity(p.len() * self.raw_byte_len() as usize);
let crc_bytes = Vec::<u8>::with_capacity(p.len() * self.raw_byte_len() as usize);
let mut h = self.hasher()?;
h.write(&crc_bytes);
Ok(Checksum {

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use std::collections::HashMap;

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde::{de::Deserializer, ser::Serializer};
@@ -7,7 +14,7 @@ use std::fmt::Display;
use s3s::Body;
use s3s::S3ErrorCode;
const REPORT_ISSUE: &str = "Please report this issue at https://github.com/rustfs/rustfs/issues.";
const _REPORT_ISSUE: &str = "Please report this issue at https://github.com/rustfs/rustfs/issues.";
#[derive(Serialize, Deserialize, Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[serde(default, rename_all = "PascalCase")]
@@ -26,7 +33,7 @@ pub struct ErrorResponse {
pub status_code: StatusCode,
}
fn serialize_code<S>(data: &S3ErrorCode, s: S) -> Result<S::Ok, S::Error>
fn serialize_code<S>(_data: &S3ErrorCode, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::HeaderMap;
use std::io::Cursor;
@@ -169,7 +176,7 @@ impl Object {
if !self.is_started || !self.object_info_set {
let seek_req = GetRequest {
is_read_op: false,
offset: offset,
offset,
is_first_req: true,
..Default::default()
};

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::{HeaderMap, HeaderName, HeaderValue};
use std::collections::HashMap;
use time::OffsetDateTime;

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use std::collections::HashMap;

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderName, HeaderValue};
use std::{collections::HashMap, sync::Arc};
@@ -206,10 +213,8 @@ impl PutObjectOptions {
if let Ok(header_name) = HeaderName::from_bytes(k.as_bytes()) {
header.insert(header_name, HeaderValue::from_str(&v).unwrap());
}
} else {
if let Ok(header_name) = HeaderName::from_bytes(format!("x-amz-meta-{}", k).as_bytes()) {
header.insert(header_name, HeaderValue::from_str(&v).unwrap());
}
} else if let Ok(header_name) = HeaderName::from_bytes(format!("x-amz-meta-{}", k).as_bytes()) {
header.insert(header_name, HeaderValue::from_str(&v).unwrap());
}
}
@@ -239,7 +244,7 @@ impl TransitionClient {
self: Arc<Self>,
bucket_name: &str,
object_name: &str,
mut reader: ReaderImpl,
reader: ReaderImpl,
object_size: i64,
opts: &PutObjectOptions,
) -> Result<UploadInfo, std::io::Error> {
@@ -255,7 +260,7 @@ impl TransitionClient {
self: Arc<Self>,
bucket_name: &str,
object_name: &str,
mut reader: ReaderImpl,
reader: ReaderImpl,
size: i64,
opts: &PutObjectOptions,
) -> Result<UploadInfo, std::io::Error> {
@@ -346,7 +351,7 @@ impl TransitionClient {
let mut md5_base64: String = "".to_string();
if opts.send_content_md5 {
let mut md5_hasher = self.md5_hasher.lock().unwrap();
let mut hash = md5_hasher.as_mut().expect("err");
let hash = md5_hasher.as_mut().expect("err");
hash.write(&buf[..length]);
md5_base64 = base64_encode(hash.sum().as_bytes());
} else {
@@ -406,7 +411,7 @@ impl TransitionClient {
compl_multipart_upload.parts.sort();
let mut opts = PutObjectOptions {
let opts = PutObjectOptions {
//server_side_encryption: opts.server_side_encryption,
auto_checksum: opts.auto_checksum,
..Default::default()

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use crate::client::{
api_error_response::{err_entity_too_large, err_invalid_argument},
api_put_object::PutObjectOptions,
@@ -47,12 +54,10 @@ pub fn optimal_part_info(object_size: i64, configured_part_size: u64) -> Result<
)));
}
if !unknown_size {
if object_size > (configured_part_size as i64 * MAX_PARTS_COUNT) {
return Err(std::io::Error::other(err_invalid_argument(
"Part size * max_parts(10000) is lesser than input objectSize.",
)));
}
if !unknown_size && object_size > (configured_part_size as i64 * MAX_PARTS_COUNT) {
return Err(std::io::Error::other(err_invalid_argument(
"Part size * max_parts(10000) is lesser than input objectSize.",
)));
}
if (configured_part_size as i64) < ABS_MIN_PART_SIZE {

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use s3s::S3ErrorCode;
@@ -104,8 +111,8 @@ impl TransitionClient {
//let rd = newHook(bytes.NewReader(buf[..length]), opts.progress);
let rd = Bytes::from(buf.clone());
let mut md5_base64: String;
let mut sha256_hex: String;
let md5_base64: String;
let sha256_hex: String;
//if hash_sums["md5"] != nil {
md5_base64 = base64_encode(&hash_sums["md5"]);
@@ -166,7 +173,7 @@ impl TransitionClient {
}
compl_multipart_upload.parts.sort();
let mut opts = PutObjectOptions {
let opts = PutObjectOptions {
//server_side_encryption: opts.server_side_encryption,
auto_checksum: opts.auto_checksum,
..Default::default()
@@ -199,7 +206,7 @@ impl TransitionClient {
url_values.insert("versionId".to_string(), opts.internal.source_version_id.clone());
}
let mut custom_header = opts.header();
let custom_header = opts.header();
let mut req_metadata = RequestMetadata {
bucket_name: bucket_name.to_string(),
@@ -339,7 +346,7 @@ impl TransitionClient {
url_values.insert("uploadId".to_string(), upload_id.to_string());
let complete_multipart_upload_bytes = complete.marshal_msg()?.as_bytes().to_vec();
let mut headers = opts.header();
let headers = opts.header();
let complete_multipart_upload_buffer = Bytes::from(complete_multipart_upload_bytes);
let mut req_metadata = RequestMetadata {

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use futures::future::join_all;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
@@ -41,7 +48,7 @@ impl TransitionClient {
self: Arc<Self>,
bucket_name: &str,
object_name: &str,
mut reader: ReaderImpl,
reader: ReaderImpl,
size: i64,
opts: &PutObjectOptions,
) -> Result<UploadInfo, std::io::Error> {
@@ -67,7 +74,7 @@ impl TransitionClient {
&self,
bucket_name: &str,
object_name: &str,
mut reader: ReaderImpl,
reader: ReaderImpl,
size: i64,
opts: &PutObjectOptions,
) -> Result<UploadInfo, std::io::Error> {
@@ -132,7 +139,7 @@ impl TransitionClient {
if opts.send_content_md5 {
let mut md5_hasher = self.md5_hasher.lock().unwrap();
let mut md5_hash = md5_hasher.as_mut().expect("err");
let md5_hash = md5_hasher.as_mut().expect("err");
md5_hash.reset();
md5_hash.write(&buf[..length]);
md5_base64 = base64_encode(md5_hash.sum().as_bytes());
@@ -173,15 +180,13 @@ impl TransitionClient {
total_uploaded_size += part_size as i64;
}
if size > 0 {
if total_uploaded_size != size {
return Err(std::io::Error::other(err_unexpected_eof(
total_uploaded_size,
size,
bucket_name,
object_name,
)));
}
if size > 0 && total_uploaded_size != size {
return Err(std::io::Error::other(err_unexpected_eof(
total_uploaded_size,
size,
bucket_name,
object_name,
)));
}
let mut compl_multipart_upload = CompleteMultipartUpload::default();
@@ -242,7 +247,7 @@ impl TransitionClient {
opts.user_metadata.remove("X-Amz-Checksum-Algorithm");
let mut total_uploaded_size: i64 = 0;
let mut parts_info = Arc::new(RwLock::new(HashMap::<i64, ObjectPart>::new()));
let parts_info = Arc::new(RwLock::new(HashMap::<i64, ObjectPart>::new()));
let n_buffers = opts.num_threads;
let (bufs_tx, mut bufs_rx) = mpsc::channel(n_buffers as usize);
@@ -315,7 +320,7 @@ impl TransitionClient {
if opts.send_content_md5 {
let mut md5_hasher = clone_self.md5_hasher.lock().unwrap();
let mut md5_hash = md5_hasher.as_mut().expect("err");
let md5_hash = md5_hasher.as_mut().expect("err");
md5_hash.write(&buf[..length]);
md5_base64 = base64_encode(md5_hash.sum().as_bytes());
}
@@ -357,7 +362,7 @@ impl TransitionClient {
let mut compl_multipart_upload = CompleteMultipartUpload::default();
let mut part_number: i64 = total_parts_count;
let part_number: i64 = total_parts_count;
let mut all_parts = Vec::<ObjectPart>::with_capacity(parts_info.read().unwrap().len());
for i in 1..part_number {
let part = parts_info.read().unwrap()[&i].clone();
@@ -396,7 +401,7 @@ impl TransitionClient {
&self,
bucket_name: &str,
object_name: &str,
mut reader: ReaderImpl,
reader: ReaderImpl,
size: i64,
opts: &PutObjectOptions,
) -> Result<UploadInfo, std::io::Error> {
@@ -405,7 +410,7 @@ impl TransitionClient {
opts.send_content_md5 = false;
}
let mut md5_base64: String = "".to_string();
let md5_base64: String = "".to_string();
let progress_reader = reader; //newHook(reader, opts.progress);
self.put_object_do(bucket_name, object_name, progress_reader, &md5_base64, "", size, &opts)
@@ -492,7 +497,7 @@ impl TransitionClient {
} else {
"".to_string()
},
size: size,
size,
expiration: exp_time,
expiration_rule_id: rule_id,
checksum_crc32: if let Some(h_checksum_crc32) = h.get(ChecksumMode::ChecksumCRC32.key()) {

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue, Method, StatusCode};
use s3s::S3ErrorCode;
@@ -55,7 +62,7 @@ pub struct RemoveObjectOptions {
impl TransitionClient {
pub async fn remove_bucket_with_options(&self, bucket_name: &str, opts: &RemoveBucketOptions) -> Result<(), std::io::Error> {
let mut headers = HeaderMap::new();
let headers = HeaderMap::new();
/*if opts.force_delete {
headers.insert(rustFSForceDelete, "true");
}*/
@@ -189,7 +196,7 @@ impl TransitionClient {
objects_rx: Receiver<ObjectInfo>,
opts: RemoveObjectsOptions,
) -> Receiver<RemoveObjectResult> {
let (result_tx, mut result_rx) = mpsc::channel(1);
let (result_tx, result_rx) = mpsc::channel(1);
let self_clone = Arc::clone(&self);
let bucket_name_owned = bucket_name.to_string();
@@ -208,7 +215,7 @@ impl TransitionClient {
objects_rx: Receiver<ObjectInfo>,
opts: RemoveObjectsOptions,
) -> Receiver<RemoveObjectError> {
let (error_tx, mut error_rx) = mpsc::channel(1);
let (error_tx, error_rx) = mpsc::channel(1);
let self_clone = Arc::clone(&self);
let bucket_name_owned = bucket_name.to_string();

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use s3s::dto::Owner;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::Request;
use hyper::StatusCode;
use hyper::body::Incoming;

View File

@@ -1,4 +1,10 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
use lazy_static::lazy_static;
use std::{collections::HashMap, sync::Arc};
use time::{format_description::FormatItem, macros::format_description};
@@ -20,6 +26,6 @@ pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
pub const ISO8601_DATEFORMAT: &[FormatItem<'_>] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z");
const GetObjectAttributesTags: &str = "ETag,Checksum,StorageClass,ObjectSize,ObjectParts";
const GetObjectAttributesMaxParts: i64 = 1000;
pub const GET_OBJECT_ATTRIBUTES_TAGS: &str = "ETag,Checksum,StorageClass,ObjectSize,ObjectParts";
pub const GET_OBJECT_ATTRIBUTES_MAX_PARTS: i64 = 1000;
const RUSTFS_BUCKET_SOURCE_MTIME: &str = "X-RustFs-Source-Mtime";

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use std::fmt::{Display, Formatter};
use time::OffsetDateTime;
@@ -22,7 +29,7 @@ pub struct Credentials<P: Provider + Default> {
impl<P: Provider + Default> Credentials<P> {
pub fn new(provider: P) -> Self {
Self {
provider: provider,
provider,
force_refresh: true,
..Default::default()
}

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::HeaderMap;
use std::io::Cursor;
use std::{collections::HashMap, sync::Arc};
@@ -55,8 +62,8 @@ fn part_number_to_rangespec(oi: ObjectInfo, part_number: usize) -> Option<HTTPRa
}
Some(HTTPRangeSpec {
start: start,
end: end,
start,
end,
is_suffix_length: false,
})
}

View File

@@ -6,12 +6,9 @@ use crate::store::ECStore;
use crate::store_api::{ObjectOptions, ObjectToDelete};
use lock::local_locker::MAX_DELETE_LIST;
pub async fn delete_object_versions(api: ECStore, bucket: &str, to_del: &[ObjectToDelete], lc_event: lifecycle::Event) {
pub async fn delete_object_versions(api: ECStore, bucket: &str, to_del: &[ObjectToDelete], _lc_event: lifecycle::Event) {
let mut remaining = to_del;
loop {
if remaining.len() <= 0 {
break;
};
let mut to_del = remaining;
if to_del.len() > MAX_DELETE_LIST {
remaining = &to_del[MAX_DELETE_LIST..];
@@ -20,7 +17,7 @@ pub async fn delete_object_versions(api: ECStore, bucket: &str, to_del: &[Object
remaining = &[];
}
let vc = BucketVersioningSys::get(bucket).await.expect("err!");
let deleted_objs = api.delete_objects(
let _deleted_objs = api.delete_objects(
bucket,
to_del.to_vec(),
ObjectOptions {

View File

@@ -1,4 +1,11 @@
#![allow(clippy::map_entry)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use futures::Future;
use http::{HeaderMap, HeaderName};
@@ -300,7 +307,7 @@ impl TransitionClient {
return Err(std::io::Error::other(s));
}
let mut retryable: bool;
let retryable: bool;
//let mut body_seeker: BufferReader;
let mut req_retry = self.max_retries;
let mut resp: http::Response<Body>;
@@ -348,11 +355,9 @@ impl TransitionClient {
bucket_loc_cache.set(&metadata.bucket_name, &err_response.region);
//continue;
}
} else {
if err_response.region != metadata.bucket_location {
metadata.bucket_location = err_response.region.clone();
//continue;
}
} else if err_response.region != metadata.bucket_location {
metadata.bucket_location = err_response.region.clone();
//continue;
}
return Err(std::io::Error::other(err_response));
}
@@ -374,10 +379,8 @@ impl TransitionClient {
metadata: &mut RequestMetadata,
) -> Result<http::Request<Body>, std::io::Error> {
let location = metadata.bucket_location.clone();
if location == "" {
if metadata.bucket_name != "" {
let location = self.get_bucket_location(&metadata.bucket_name).await?;
}
if location == "" && metadata.bucket_name != "" {
let location = self.get_bucket_location(&metadata.bucket_name).await?;
}
let is_makebucket = metadata.object_name == "" && method == http::Method::PUT && metadata.query_values.len() == 0;
@@ -538,7 +541,7 @@ impl TransitionClient {
}
pub fn set_user_agent(&self, req: &mut Builder) {
let mut headers = req.headers_mut().expect("err");
let headers = req.headers_mut().expect("err");
headers.insert("User-Agent", C_USER_AGENT.parse().expect("err"));
/*if self.app_info.app_name != "" && self.app_info.app_version != "" {
headers.insert("User-Agent", C_USER_AGENT+" "+self.app_info.app_name+"/"+self.app_info.app_version);
@@ -771,7 +774,7 @@ impl TransitionCore {
part_number: part_id,
md5_base64: opts.md5_base64,
sha256_hex: opts.sha256_hex,
size: size,
size,
//sse: opts.sse,
stream_sha256: !opts.disable_content_sha256,
custom_header: opts.custom_header,

View File

@@ -737,7 +737,7 @@ pub fn to_object_err(err: Error, params: Vec<&str>) -> Error {
}
}
pub fn is_network_or_host_down(err: &str, expect_timeouts: bool) -> bool {
pub fn is_network_or_host_down(err: &str, _expect_timeouts: bool) -> bool {
err.contains("Connection closed by foreign host")
|| err.contains("TLS handshake timeout")
|| err.contains("i/o timeout")
@@ -809,7 +809,7 @@ pub fn error_resp_to_object_err(err: ErrorResponse, params: Vec<&str>) -> std::i
return std::io::Error::other(ObjectApiError::BackendDown(format!("{}", err)));
}
let mut err_ = std::io::Error::other(err.to_string());
let err_ = std::io::Error::other(err.to_string());
let r_err = err;
let mut err = err_;
let bucket = bucket.to_string();
@@ -908,8 +908,8 @@ pub fn storage_to_object_err(err: Error, params: Vec<&str>) -> S3Error {
StorageError::MethodNotAllowed => S3Error::with_message(
S3ErrorCode::MethodNotAllowed,
ObjectApiError::MethodNotAllowed(GenericError {
bucket: bucket,
object: object,
bucket,
object,
..Default::default()
})
.to_string(),

View File

@@ -1,3 +1,5 @@
#![allow(unused_variables)]
#[derive(Default)]
pub enum EventName {
ObjectAccessedGet,
@@ -32,7 +34,7 @@ pub enum EventName {
ObjectLargeVersions,
PrefixManyFolders,
ILMDelMarkerExpirationDelete,
objectSingleTypesEnd,
ObjectSingleTypesEnd,
ObjectAccessedAll,
ObjectCreatedAll,
ObjectRemovedAll,

View File

@@ -1,3 +1,5 @@
#![allow(clippy::all)]
pub struct TargetID {
id: String,
name: String,

View File

@@ -1,3 +1,6 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

View File

@@ -12,7 +12,6 @@ use std::{
time::{Duration, SystemTime},
};
use common::defer;
use time::{self, OffsetDateTime};
use super::{
@@ -30,15 +29,14 @@ use crate::event::name::EventName;
use crate::{
bucket::{
lifecycle::{
bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc},
bucket_lifecycle_ops::{self, GLOBAL_ExpiryState, GLOBAL_TransitionState, LifecycleOps, expire_transitioned_object},
bucket_lifecycle_audit::LcEventSrc,
bucket_lifecycle_ops::{GLOBAL_ExpiryState, GLOBAL_TransitionState, LifecycleOps, expire_transitioned_object},
lifecycle::{self, ExpirationOptions, Lifecycle},
},
metadata_sys,
},
event_notification::{EventArgs, send_event},
global::GLOBAL_LocalNodeName,
heal::data_scanner,
store_api::{ObjectOptions, ObjectToDelete, StorageAPI},
};
use crate::{
@@ -81,7 +79,7 @@ use rustfs_utils::path::encode_dir_object;
use rustfs_utils::path::{SLASH_SEPARATOR, path_join, path_to_bucket_object, path_to_bucket_object_with_base_path};
use s3s::dto::{
BucketLifecycleConfiguration, DefaultRetention, ExpirationStatus, LifecycleRule, ReplicationConfiguration,
ReplicationRuleStatus, VersioningConfiguration,
ReplicationRuleStatus,
};
use serde::{Deserialize, Serialize};
use tokio::{
@@ -547,7 +545,7 @@ impl ScannerItem {
info!("apply_lifecycle debug");
}
if self.lifecycle.is_none() {
return (lifecycle::IlmAction::NoneAction, size as i64);
return (lifecycle::IlmAction::NoneAction, size);
}
let version_id = oi.version_id;
@@ -558,16 +556,12 @@ impl ScannerItem {
if !is_meta_bucketname(&self.bucket) {
vc = Some(BucketVersioningSys::get(&self.bucket).await.unwrap());
lr = BucketObjectLockSys::get(&self.bucket).await;
rcfg = if let Ok(replication_config) = metadata_sys::get_replication_config(&self.bucket).await {
Some(replication_config)
} else {
None
};
rcfg = (metadata_sys::get_replication_config(&self.bucket).await).ok();
}
let lc_evt = eval_action_from_lifecycle(self.lifecycle.as_ref().expect("err"), lr, rcfg, oi).await;
if self.debug {
if !version_id.is_none() {
if version_id.is_some() {
info!(
"lifecycle: {} (version-id={}), Initial scan: {}",
self.object_path().to_string_lossy().to_string(),
@@ -598,7 +592,7 @@ impl ScannerItem {
}
apply_lifecycle_action(&lc_evt, &LcEventSrc::Scanner, oi).await;
(lc_evt.action, size as i64)
(lc_evt.action, size)
}
pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
@@ -627,7 +621,7 @@ impl ScannerItem {
} else {
false
};
let vcfg = BucketVersioningSys::get(&self.bucket).await?;
let _vcfg = BucketVersioningSys::get(&self.bucket).await?;
let versioned = match BucketVersioningSys::get(&self.bucket).await {
Ok(vcfg) => vcfg.versioned(self.object_path().to_str().unwrap_or_default()),
@@ -709,7 +703,7 @@ impl ScannerItem {
});
}
if to_del.len() > 0 {
if !to_del.is_empty() {
let mut expiry_state = GLOBAL_ExpiryState.write().await;
expiry_state.enqueue_by_newer_noncurrent(&self.bucket, to_del, event).await;
}
@@ -721,7 +715,7 @@ impl ScannerItem {
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
let done = ScannerMetrics::time(ScannerMetric::Ilm);
let (action, size) = self.apply_lifecycle(oi).await;
let (action, _size) = self.apply_lifecycle(oi).await;
info!(
"apply_actions {} {} {:?} {:?}",
@@ -1591,7 +1585,7 @@ pub async fn eval_action_from_lifecycle(
}
if lock_enabled && enforce_retention_for_deletion(oi) {
//if serverDebugLog {
if !oi.version_id.is_none() {
if oi.version_id.is_some() {
info!("lifecycle: {} v({}) is locked, not deleting", oi.name, oi.version_id.expect("err"));
} else {
info!("lifecycle: {} is locked, not deleting", oi.name);
@@ -1629,7 +1623,7 @@ pub async fn apply_expiry_on_transitioned_object(
if let Err(_err) = expire_transitioned_object(api, oi, lc_event, src).await {
return false;
}
time_ilm(1);
let _ = time_ilm(1);
true
}
@@ -1638,7 +1632,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
api: Arc<ECStore>,
oi: &ObjectInfo,
lc_event: &lifecycle::Event,
src: &LcEventSrc,
_src: &LcEventSrc,
) -> bool {
let mut opts = ObjectOptions {
expiration: ExpirationOptions { expire: true },
@@ -1656,8 +1650,6 @@ pub async fn apply_expiry_on_non_transitioned_objects(
opts.delete_prefix = true;
opts.delete_prefix_object = true;
}
let dobj: ObjectInfo;
//let err: Error;
let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
@@ -1665,7 +1657,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
.delete_object(&oi.bucket, &encode_dir_object(&oi.name), opts)
.await
.unwrap();
if dobj.name == "" {
if dobj.name.is_empty() {
dobj = oi.clone();
}
@@ -1695,7 +1687,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
if lc_event.action.delete_all() {
num_versions = oi.num_versions as u64;
}
time_ilm(num_versions);
let _ = time_ilm(num_versions);
}
true

View File

@@ -12,7 +12,6 @@ use std::{
time::{Duration, SystemTime},
};
use tokio::sync::{Mutex, RwLock};
use tracing::debug;
use super::data_scanner::CurrentScannerCycle;
use crate::bucket::lifecycle::lifecycle;

View File

@@ -173,9 +173,9 @@ impl AllTierStats {
stats.insert(
tier.clone(),
TierStats {
total_size: st.total_size.clone(),
num_versions: st.num_versions.clone(),
num_objects: st.num_objects.clone(),
total_size: st.total_size,
num_versions: st.num_versions,
num_objects: st.num_objects,
},
);
}

View File

@@ -1,3 +1,6 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE;
use crate::client::{object_api_utils::extract_etag, transition_api::ReaderImpl};
@@ -3723,7 +3726,7 @@ impl SetDisks {
oi.user_defined.remove(X_AMZ_RESTORE.as_str());
let version_id = oi.version_id.clone().map(|v| v.to_string());
let version_id = oi.version_id.map(|v| v.to_string());
let obj = self
.copy_object(
bucket,
@@ -3736,7 +3739,7 @@ impl SetDisks {
..Default::default()
},
&ObjectOptions {
version_id: version_id,
version_id,
..Default::default()
},
)
@@ -4245,7 +4248,7 @@ impl StorageAPI for SetDisks {
futures.push(async move {
if let Some(disk) = disk {
match disk
.delete_version(&bucket, &object, fi.clone(), force_del_marker, DeleteOptions::default())
.delete_version(bucket, object, fi.clone(), force_del_marker, DeleteOptions::default())
.await
{
Ok(r) => Ok(r),
@@ -4585,7 +4588,7 @@ impl StorageAPI for SetDisks {
//defer lk.Unlock(lkctx)
}*/
let (mut fi, mut meta_arr, mut online_disks) = self.get_object_fileinfo(&bucket, &object, &opts, true).await?;
let (mut fi, meta_arr, online_disks) = self.get_object_fileinfo(bucket, object, opts, true).await?;
/*if err != nil {
return Err(to_object_err(err, vec![bucket, object]));
}*/
@@ -4596,7 +4599,7 @@ impl StorageAPI for SetDisks {
return Err(to_object_err(ERR_METHOD_NOT_ALLOWED, vec![bucket, object]));
}*/
if !opts.mod_time.expect("err").unix_timestamp() == fi.mod_time.as_ref().expect("err").unix_timestamp()
|| !(opts.transition.etag == extract_etag(&fi.metadata))
|| opts.transition.etag != extract_etag(&fi.metadata)
{
return Err(to_object_err(Error::from(DiskError::FileNotFound), vec![bucket, object]));
}
@@ -4622,7 +4625,7 @@ impl StorageAPI for SetDisks {
}
let dest_obj = dest_obj.unwrap();
let oi = ObjectInfo::from_file_info(&fi, &bucket, &object, opts.versioned || opts.version_suspended);
let oi = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
let (pr, mut pw) = tokio::io::duplex(fi.erasure.block_size);
//let h = HeaderMap::new();
@@ -4657,7 +4660,7 @@ impl StorageAPI for SetDisks {
});
let rv = tgt_client
.put_with_meta(&dest_obj, reader, fi.size as i64, {
.put_with_meta(&dest_obj, reader, fi.size, {
let mut m = HashMap::<String, String>::new();
m.insert("name".to_string(), object.to_string());
m
@@ -4672,7 +4675,7 @@ impl StorageAPI for SetDisks {
fi.transition_status = TRANSITION_COMPLETE.to_string();
fi.transitioned_objname = dest_obj;
fi.transition_tier = opts.transition.tier.clone();
fi.transition_version_id = if rv == "" { None } else { Some(Uuid::parse_str(&rv)?) };
fi.transition_version_id = if rv.is_empty() { None } else { Some(Uuid::parse_str(&rv)?) };
let mut event_name = EventName::ObjectTransitionComplete.as_ref();
let disks = self.get_disks(0, 0).await?;
@@ -4687,7 +4690,7 @@ impl StorageAPI for SetDisks {
continue;
}
}
self.add_partial(bucket, object, &opts.version_id.as_ref().expect("err"))
let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err"))
.await;
break;
}
@@ -4716,11 +4719,11 @@ impl StorageAPI for SetDisks {
Err(rerr.unwrap())
};
let mut oi = ObjectInfo::default();
let fi = self.get_object_fileinfo(&bucket, &object, &opts, true).await;
let fi = self.get_object_fileinfo(bucket, object, opts, true).await;
if let Err(err) = fi {
return set_restore_header_fn(&mut oi, Some(to_object_err(err, vec![bucket, object]))).await;
}
let (mut actual_fi, _, _) = fi.unwrap();
let (actual_fi, _, _) = fi.unwrap();
oi = ObjectInfo::from_file_info(&actual_fi, bucket, object, opts.versioned || opts.version_suspended);
let ropts = put_restore_opts(bucket, object, &opts.transition.restore_request, &oi);

View File

@@ -461,7 +461,7 @@ impl StorageAPI for Sets {
}
#[tracing::instrument(skip(self))]
async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> {
async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, _force_del_marker: bool) -> Result<()> {
unimplemented!()
}
@@ -585,8 +585,7 @@ impl StorageAPI for Sets {
#[tracing::instrument(skip(self))]
async fn add_partial(&self, bucket: &str, object: &str, version_id: &str) -> Result<()> {
self.get_disks_by_key(object).add_partial(bucket, object, version_id).await;
Ok(())
self.get_disks_by_key(object).add_partial(bucket, object, version_id).await
}
#[tracing::instrument(skip(self))]

View File

@@ -1,22 +1,16 @@
use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use futures::task;
use http::HeaderMap;
use http::Uri;
use http::header::TRAILER;
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::request::{self, Request};
use hyper::Method;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::fmt::Write;
use std::pin::Pin;
use std::sync::Mutex;
use stdx::str::StrExt;
use time::{OffsetDateTime, format_description, macros::datetime, macros::format_description};
use tracing::{debug, error, info, warn};
use time::{OffsetDateTime, macros::format_description};
use super::request_signature_v4::{SERVICE_TYPE_S3, get_scope, get_signature, get_signing_key};
use crate::client::constants::UNSIGNED_PAYLOAD;
use rustfs_utils::{
crypto::{hex, hex_sha256, hex_sha256_chunk, hmac_sha256},
hash::EMPTY_STRING_SHA256_HASH,

View File

@@ -1,3 +1,9 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
use http::request;
use time::OffsetDateTime;

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::{Bytes, BytesMut};
use http::request;
use hyper::Uri;
@@ -117,14 +124,14 @@ fn pre_string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
}
fn write_pre_sign_v2_headers(buf: &mut BytesMut, req: &request::Builder) {
buf.write_str(req.method_ref().unwrap().as_str());
buf.write_char('\n');
buf.write_str(req.headers_ref().unwrap().get("Content-Md5").unwrap().to_str().unwrap());
buf.write_char('\n');
buf.write_str(req.headers_ref().unwrap().get("Content-Type").unwrap().to_str().unwrap());
buf.write_char('\n');
buf.write_str(req.headers_ref().unwrap().get("Expires").unwrap().to_str().unwrap());
buf.write_char('\n');
let _ = buf.write_str(req.method_ref().unwrap().as_str());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Md5").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Type").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Expires").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
}
fn string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
@@ -136,14 +143,14 @@ fn string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
}
fn write_sign_v2_headers(buf: &mut BytesMut, req: &request::Builder) {
buf.write_str(req.method_ref().unwrap().as_str());
buf.write_char('\n');
buf.write_str(req.headers_ref().unwrap().get("Content-Md5").unwrap().to_str().unwrap());
buf.write_char('\n');
buf.write_str(req.headers_ref().unwrap().get("Content-Type").unwrap().to_str().unwrap());
buf.write_char('\n');
buf.write_str(req.headers_ref().unwrap().get("Date").unwrap().to_str().unwrap());
buf.write_char('\n');
let _ = buf.write_str(req.method_ref().unwrap().as_str());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Md5").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Type").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Date").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
}
fn write_canonicalized_headers(buf: &mut BytesMut, req: &request::Builder) {
@@ -165,15 +172,15 @@ fn write_canonicalized_headers(buf: &mut BytesMut, req: &request::Builder) {
}
proto_headers.sort();
for k in proto_headers {
buf.write_str(&k);
buf.write_char(':');
let _ = buf.write_str(&k);
let _ = buf.write_char(':');
for (idx, v) in vals[&k].iter().enumerate() {
if idx > 0 {
buf.write_char(',');
let _ = buf.write_char(',');
}
buf.write_str(v);
let _ = buf.write_str(v);
}
buf.write_char('\n');
let _ = buf.write_char('\n');
}
}
@@ -203,7 +210,7 @@ const INCLUDED_QUERY: &[&str] = &[
fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Builder, virtual_host: bool) {
let request_url = req.uri_ref().unwrap();
buf.write_str(&encode_url2path(req, virtual_host));
let _ = buf.write_str(&encode_url2path(req, virtual_host));
if request_url.query().unwrap() != "" {
let mut n: i64 = 0;
let result = serde_urlencoded::from_str::<HashMap<String, Vec<String>>>(req.uri_ref().unwrap().query().unwrap());
@@ -214,14 +221,14 @@ fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Builder, virt
n += 1;
match n {
1 => {
buf.write_char('?');
let _ = buf.write_char('?');
}
_ => {
buf.write_char('&');
buf.write_str(resource);
let _ = buf.write_char('&');
let _ = buf.write_str(resource);
if vv[0].len() > 0 {
buf.write_char('=');
buf.write_str(&vv[0]);
let _ = buf.write_char('=');
let _ = buf.write_str(&vv[0]);
}
}
}

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::{Bytes, BytesMut};
use http::HeaderMap;
use http::Uri;
@@ -105,22 +112,22 @@ fn get_canonical_headers(req: &request::Builder, ignored_headers: &HashMap<Strin
let mut buf = BytesMut::new();
for k in headers {
buf.write_str(&k);
buf.write_char(':');
let _ = buf.write_str(&k);
let _ = buf.write_char(':');
let k: &str = &k;
match k {
"host" => {
buf.write_str(&get_host_addr(&req));
buf.write_char('\n');
let _ = buf.write_str(&get_host_addr(&req));
let _ = buf.write_char('\n');
}
_ => {
for (idx, v) in vals[k].iter().enumerate() {
if idx > 0 {
buf.write_char(',');
let _ = buf.write_char(',');
}
buf.write_str(&sign_v4_trim_all(v));
let _ = buf.write_str(&sign_v4_trim_all(v));
}
buf.write_char('\n');
let _ = buf.write_char('\n');
}
}
}
@@ -174,12 +181,12 @@ fn get_canonical_request(req: &request::Builder, ignored_headers: &HashMap<Strin
fn get_string_to_sign_v4(t: OffsetDateTime, location: &str, canonical_request: &str, service_type: &str) -> String {
let mut string_to_sign = SIGN_V4_ALGORITHM.to_string();
string_to_sign.push_str("\n");
string_to_sign.push('\n');
let format = format_description!("[year][month][day]T[hour][minute][second]Z");
string_to_sign.push_str(&t.format(&format).unwrap());
string_to_sign.push_str("\n");
string_to_sign.push('\n');
string_to_sign.push_str(&get_scope(location, t, service_type));
string_to_sign.push_str("\n");
string_to_sign.push('\n');
string_to_sign.push_str(&hex_sha256(canonical_request.as_bytes(), |s| s.to_string()));
string_to_sign
}
@@ -639,18 +646,18 @@ mod tests {
let mut headers = req.headers_mut().expect("err");
headers.insert("host", "examplebucket.s3.amazonaws.com".parse().unwrap());
req = pre_sign_v4(req, &access_key_id, &secret_access_key, "", &region, 86400, t);
req = pre_sign_v4(req, access_key_id, secret_access_key, "", region, 86400, t);
let mut canonical_request = req.method_ref().unwrap().as_str().to_string();
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(req.uri_ref().unwrap().path());
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(req.uri_ref().unwrap().query().unwrap());
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(&get_canonical_headers(&req, &v4_ignored_headers));
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(&get_signed_headers(&req, &v4_ignored_headers));
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(&get_hashed_payload(&req));
//println!("canonical_request: \n{}\n", canonical_request);
assert_eq!(
@@ -686,18 +693,18 @@ mod tests {
let mut headers = req.headers_mut().expect("err");
headers.insert("host", "192.168.1.11:9020".parse().unwrap());
req = pre_sign_v4(req, &access_key_id, &secret_access_key, "", &region, 86400, t);
req = pre_sign_v4(req, access_key_id, secret_access_key, "", region, 86400, t);
let mut canonical_request = req.method_ref().unwrap().as_str().to_string();
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(req.uri_ref().unwrap().path());
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(req.uri_ref().unwrap().query().unwrap());
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(&get_canonical_headers(&req, &v4_ignored_headers));
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(&get_signed_headers(&req, &v4_ignored_headers));
canonical_request.push_str("\n");
canonical_request.push('\n');
canonical_request.push_str(&get_hashed_payload(&req));
//println!("canonical_request: \n{}\n", canonical_request);
assert_eq!(

View File

@@ -9,8 +9,10 @@ pub fn get_host_addr(req: &request::Builder) -> String {
} else {
req_host = uri.host().unwrap().to_string();
}
if host.is_some() && req_host != host.unwrap().to_str().unwrap().to_string() {
return host.unwrap().to_str().unwrap().to_string();
if let Some(host) = host {
if req_host != host.to_str().unwrap().to_string() {
return host.to_str().unwrap().to_string();
}
}
/*if req.uri_ref().unwrap().host().is_some() {
return req.uri_ref().unwrap().host().unwrap();

View File

@@ -1883,14 +1883,14 @@ impl StorageAPI for ECStore {
let object = encode_dir_object(object);
if self.single_pool() {
self.pools[0].add_partial(bucket, object.as_str(), version_id).await;
let _ = self.pools[0].add_partial(bucket, object.as_str(), version_id).await;
}
let idx = self
.get_pool_idx_existing_with_opts(bucket, object.as_str(), &ObjectOptions::default())
.await?;
self.pools[idx].add_partial(bucket, object.as_str(), version_id).await;
let _ = self.pools[idx].add_partial(bucket, object.as_str(), version_id).await;
Ok(())
}
#[tracing::instrument(skip(self))]

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::status::StatusCode;
use lazy_static::lazy_static;

View File

@@ -1,8 +1,11 @@
use http::status::StatusCode;
use rand::Rng;
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
#[serde(default)]

View File

@@ -1,8 +1,15 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use tracing::info;
const C_TierConfigVer: &str = "v1";
const C_TIER_CONFIG_VER: &str = "v1";
const ERR_TIER_NAME_EMPTY: &str = "remote tier name empty";
const ERR_TIER_INVALID_CONFIG: &str = "invalid tier config";
@@ -112,7 +119,7 @@ impl Clone for TierConfig {
version: self.version.clone(),
tier_type: self.tier_type.clone(),
name: self.name.clone(),
s3: s3,
s3,
//azure: az,
//gcs: gcs,
rustfs: r,
@@ -237,7 +244,7 @@ impl TierS3 {
}
Ok(TierConfig {
version: C_TierConfigVer.to_string(),
version: C_TIER_CONFIG_VER.to_string(),
tier_type: TierType::S3,
name: name.to_string(),
s3: Some(sc),
@@ -306,7 +313,7 @@ impl TierMinIO {
}
Ok(TierConfig {
version: C_TierConfigVer.to_string(),
version: C_TIER_CONFIG_VER.to_string(),
tier_type: TierType::MinIO,
name: name.to_string(),
minio: Some(m),

View File

@@ -1,48 +0,0 @@
use std::{fmt::Display, pin::Pin, sync::Arc};
use tracing::info;
use common::error::{Error, Result};
use crate::bucket::tier_config::{TierType, TierConfig,};
impl TierType {
fn decode_msg(&self/*, dc *msgp.Reader*/) -> Result<()> {
todo!();
}
fn encode_msg(&self/*, en *msgp.Writer*/) -> Result<()> {
todo!();
}
pub fn marshal_msg(&self, b: &[u8]) -> Result<Vec<u8>> {
todo!();
}
pub fn unmarshal_msg(&self, bts: &[u8]) -> Result<Vec<u8>> {
todo!();
}
pub fn msg_size() -> usize {
todo!();
}
}
impl TierConfig {
fn decode_msg(&self, dc *msgp.Reader) -> Result<()> {
todo!();
}
pub fn encode_msg(&self, en *msgp.Writer) -> Result<()> {
todo!();
}
pub fn marshal_msg(&self, b: &[u8]) -> Result<Vec<u8>> {
todo!();
}
pub fn unmarshal_msg(&self, bts: &[u8]) -> Result<Vec<u8>> {
todo!();
}
fn msg_size(&self) -> usize {
todo!();
}
}

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use crate::tier::tier::TierConfigMgr;
impl TierConfigMgr {

View File

@@ -1,6 +1,5 @@
use crate::client::admin_handler_utils::AdminError;
use http::status::StatusCode;
use tracing::warn;
pub const ERR_TIER_ALREADY_EXISTS: AdminError = AdminError {
code: "XRustFSAdminTierAlreadyExists",

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use std::collections::HashMap;

View File

@@ -1,5 +1,12 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use crate::client::{
admin_handler_utils::AdminError,
@@ -46,7 +53,7 @@ impl WarmBackendMinIO {
..Default::default()
}));
let opts = Options {
creds: creds,
creds,
secure: u.scheme() == "https",
//transport: GLOBAL_RemoteTargetTransport,
trailing_headers: true,

View File

@@ -1,6 +1,12 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::warn;
use std::sync::Arc;
use crate::client::{
admin_handler_utils::AdminError,
@@ -44,7 +50,7 @@ impl WarmBackendRustFS {
..Default::default()
}));
let opts = Options {
creds: creds,
creds,
secure: u.scheme() == "https",
//transport: GLOBAL_RemoteTargetTransport,
trailing_headers: true,

View File

@@ -1,3 +1,10 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
@@ -54,7 +61,7 @@ impl WarmBackendS3 {
return Err(std::io::Error::other("no bucket name was provided"));
}
let mut creds: Credentials<Static>;
let creds: Credentials<Static>;
if conf.access_key != "" && conf.secret_key != "" {
//creds = Credentials::new_static_v4(conf.access_key, conf.secret_key, "");
@@ -69,7 +76,7 @@ impl WarmBackendS3 {
return Err(std::io::Error::other("insufficient parameters for S3 backend authentication"));
}
let opts = Options {
creds: creds,
creds,
secure: u.scheme() == "https",
//transport: GLOBAL_RemoteTargetTransport,
..Default::default()