mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix clippy
This commit is contained in:
@@ -699,13 +699,12 @@ impl DataUsageCache {
|
||||
|
||||
pub fn search_parent(&self, hash: &DataUsageHash) -> Option<DataUsageHash> {
|
||||
let want = hash.key();
|
||||
if let Some(last_index) = want.rfind('/') {
|
||||
if let Some(v) = self.find(&want[0..last_index]) {
|
||||
if v.children.contains(&want) {
|
||||
let found = hash_path(&want[0..last_index]);
|
||||
return Some(found);
|
||||
}
|
||||
}
|
||||
if let Some(last_index) = want.rfind('/')
|
||||
&& let Some(v) = self.find(&want[0..last_index])
|
||||
&& v.children.contains(&want)
|
||||
{
|
||||
let found = hash_path(&want[0..last_index]);
|
||||
return Some(found);
|
||||
}
|
||||
|
||||
for (k, v) in self.cache.iter() {
|
||||
@@ -1455,10 +1454,10 @@ impl DataUsageInfo {
|
||||
self.buckets_count = self.buckets_usage.len() as u64;
|
||||
|
||||
// Update last update time
|
||||
if let Some(other_update) = other.last_update {
|
||||
if self.last_update.is_none() || other_update > self.last_update.unwrap() {
|
||||
self.last_update = Some(other_update);
|
||||
}
|
||||
if let Some(other_update) = other.last_update
|
||||
&& (self.last_update.is_none() || other_update > self.last_update.unwrap())
|
||||
{
|
||||
self.last_update = Some(other_update);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -479,21 +479,21 @@ impl Metrics {
|
||||
// Lifetime operations
|
||||
for i in 0..Metric::Last as usize {
|
||||
let count = self.operations[i].load(Ordering::Relaxed);
|
||||
if count > 0 {
|
||||
if let Some(metric) = Metric::from_index(i) {
|
||||
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
|
||||
}
|
||||
if count > 0
|
||||
&& let Some(metric) = Metric::from_index(i)
|
||||
{
|
||||
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
|
||||
}
|
||||
}
|
||||
|
||||
// Last minute statistics for realtime metrics
|
||||
for i in 0..Metric::LastRealtime as usize {
|
||||
let last_min = self.latency[i].total().await;
|
||||
if last_min.n > 0 {
|
||||
if let Some(_metric) = Metric::from_index(i) {
|
||||
// Convert to madmin TimedAction format if needed
|
||||
// This would require implementing the conversion
|
||||
}
|
||||
if last_min.n > 0
|
||||
&& let Some(_metric) = Metric::from_index(i)
|
||||
{
|
||||
// Convert to madmin TimedAction format if needed
|
||||
// This would require implementing the conversion
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -459,15 +459,15 @@ impl FolderScanner {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(flat) = self.update_cache.size_recursive(&self.new_cache.info.name) {
|
||||
if let Some(ref updates) = self.updates {
|
||||
// Try to send without blocking
|
||||
if let Err(e) = updates.send(flat.clone()).await {
|
||||
error!("send_update: failed to send update: {}", e);
|
||||
}
|
||||
self.last_update = SystemTime::now();
|
||||
debug!("send_update: sent update for folder: {}", self.new_cache.info.name);
|
||||
if let Some(flat) = self.update_cache.size_recursive(&self.new_cache.info.name)
|
||||
&& let Some(ref updates) = self.updates
|
||||
{
|
||||
// Try to send without blocking
|
||||
if let Err(e) = updates.send(flat.clone()).await {
|
||||
error!("send_update: failed to send update: {}", e);
|
||||
}
|
||||
self.last_update = SystemTime::now();
|
||||
debug!("send_update: sent update for folder: {}", self.new_cache.info.name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -659,10 +659,10 @@ impl FolderScanner {
|
||||
Err(e) => {
|
||||
warn!("scan_folder: failed to get size for item {}: {}", item.path, e);
|
||||
// TODO: check error type
|
||||
if let Some(t) = wait {
|
||||
if let Ok(elapsed) = t.elapsed() {
|
||||
tokio::time::sleep(elapsed).await;
|
||||
}
|
||||
if let Some(t) = wait
|
||||
&& let Ok(elapsed) = t.elapsed()
|
||||
{
|
||||
tokio::time::sleep(elapsed).await;
|
||||
}
|
||||
|
||||
if e != StorageError::other("skip file".to_string()) {
|
||||
@@ -684,10 +684,10 @@ impl FolderScanner {
|
||||
into.add_sizes(&sz);
|
||||
into.objects += 1;
|
||||
|
||||
if let Some(t) = wait {
|
||||
if let Ok(elapsed) = t.elapsed() {
|
||||
tokio::time::sleep(elapsed).await;
|
||||
}
|
||||
if let Some(t) = wait
|
||||
&& let Ok(elapsed) = t.elapsed()
|
||||
{
|
||||
tokio::time::sleep(elapsed).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1055,30 +1055,31 @@ impl FolderScanner {
|
||||
self.new_cache.replace_hashed(&this_hash, &folder.parent, into);
|
||||
}
|
||||
|
||||
if !into.compacted && self.new_cache.info.name != folder.name {
|
||||
if let Some(mut flat) = self.new_cache.size_recursive(&this_hash.key()) {
|
||||
flat.compacted = true;
|
||||
let mut should_compact = false;
|
||||
if !into.compacted
|
||||
&& self.new_cache.info.name != folder.name
|
||||
&& let Some(mut flat) = self.new_cache.size_recursive(&this_hash.key())
|
||||
{
|
||||
flat.compacted = true;
|
||||
let mut should_compact = false;
|
||||
|
||||
if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT {
|
||||
should_compact = true;
|
||||
} else {
|
||||
// Compact if we only have objects as children...
|
||||
should_compact = true;
|
||||
for k in &into.children {
|
||||
if let Some(v) = self.new_cache.cache.get(k) {
|
||||
if !v.children.is_empty() || v.objects > 1 {
|
||||
should_compact = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT {
|
||||
should_compact = true;
|
||||
} else {
|
||||
// Compact if we only have objects as children...
|
||||
should_compact = true;
|
||||
for k in &into.children {
|
||||
if let Some(v) = self.new_cache.cache.get(k)
|
||||
&& (!v.children.is_empty() || v.objects > 1)
|
||||
{
|
||||
should_compact = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if should_compact {
|
||||
self.new_cache.delete_recursive(&this_hash);
|
||||
self.new_cache.replace_hashed(&this_hash, &folder.parent, &flat);
|
||||
}
|
||||
if should_compact {
|
||||
self.new_cache.delete_recursive(&this_hash);
|
||||
self.new_cache.replace_hashed(&this_hash, &folder.parent, &flat);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -167,13 +167,12 @@ impl ScannerIO for ECStore {
|
||||
all_merged.merge(result);
|
||||
}
|
||||
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update {
|
||||
if let Err(e) = updates
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update
|
||||
&& let Err(e) = updates
|
||||
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
|
||||
.await {
|
||||
error!("Failed to send data usage info: {}", e);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
@@ -245,10 +244,10 @@ impl ScannerIOCache for SetDisks {
|
||||
permutes.shuffle(&mut rand::rng());
|
||||
|
||||
for bucket in permutes.iter() {
|
||||
if old_cache.find(&bucket.name).is_none() {
|
||||
if let Err(e) = bucket_tx.send(bucket.clone()).await {
|
||||
error!("Failed to send bucket info: {}", e);
|
||||
}
|
||||
if old_cache.find(&bucket.name).is_none()
|
||||
&& let Err(e) = bucket_tx.send(bucket.clone()).await
|
||||
{
|
||||
error!("Failed to send bucket info: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,7 +271,7 @@ impl ScannerIOCache for SetDisks {
|
||||
let store_clone = self.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
let send_update_fut = tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(30 + rand::random::<u64>() % 10));
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(3 + rand::random::<u64>() % 10));
|
||||
|
||||
let mut last_update = None;
|
||||
|
||||
@@ -403,12 +402,11 @@ impl ScannerIOCache for SetDisks {
|
||||
Err(e) => {
|
||||
error!("Failed to scan disk: {}", e);
|
||||
|
||||
if let (Some(last_update), Some(before_update)) = (cache.info.last_update, before) {
|
||||
if last_update > before_update {
|
||||
if let Err(e) = cache.save(store_clone_clone.clone(), cache_name.as_str()).await {
|
||||
error!("Failed to save data usage cache: {}", e);
|
||||
}
|
||||
}
|
||||
if let (Some(last_update), Some(before_update)) = (cache.info.last_update, before)
|
||||
&& last_update > before_update
|
||||
&& let Err(e) = cache.save(store_clone_clone.clone(), cache_name.as_str()).await
|
||||
{
|
||||
error!("Failed to save data usage cache: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = update_fut.await {
|
||||
@@ -576,13 +574,13 @@ impl ScannerIODisk for Disk {
|
||||
OffsetDateTime::now_utc(),
|
||||
));
|
||||
|
||||
if replication_config.has_active_rules("", true) {
|
||||
if let Ok(targets) = BucketTargetSys::get().list_bucket_targets(&cache.info.name).await {
|
||||
cache.info.replication = Some(Arc::new(ReplicationConfig {
|
||||
config: Some(replication_config),
|
||||
remotes: Some(targets),
|
||||
}));
|
||||
}
|
||||
if replication_config.has_active_rules("", true)
|
||||
&& let Ok(targets) = BucketTargetSys::get().list_bucket_targets(&cache.info.name).await
|
||||
{
|
||||
cache.info.replication = Some(Arc::new(ReplicationConfig {
|
||||
config: Some(replication_config),
|
||||
remotes: Some(targets),
|
||||
}));
|
||||
}
|
||||
|
||||
// TODO: object lock
|
||||
|
||||
Reference in New Issue
Block a user