From 7a7ca6e154e812ead0dda345697ef93d37cad7c2 Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 20 Jun 2025 17:57:05 +0800 Subject: [PATCH] moved to the trash and automatically cleaned up in the background --- ecstore/src/disk/local.rs | 96 +++++++++++++++++++--- ecstore/src/disk/os.rs | 23 +++--- ecstore/src/rebalance.rs | 162 +++++++++++++++++++------------------- ecstore/src/set_disk.rs | 48 ++++++++++- ecstore/src/sets.rs | 82 ++++++++++++++----- ecstore/src/store_api.rs | 2 - ecstore/src/store_init.rs | 9 +-- ecstore/src/tier/tier.rs | 2 +- 8 files changed, 292 insertions(+), 132 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index a5d3d053..a4f4d8af 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -18,7 +18,7 @@ use crate::disk::fs::{ use crate::disk::os::{check_path_length, is_empty_dir}; use crate::disk::{ CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, CHECK_PART_VOLUME_NOT_FOUND, - FileReader, conv_part_err_to_int, + FileReader, RUSTFS_META_TMP_DELETED_BUCKET, conv_part_err_to_int, }; use crate::disk::{FileWriter, STORAGE_FORMAT_FILE}; use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; @@ -36,6 +36,7 @@ use rustfs_utils::path::{ GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, }; +use tokio::time::interval; use crate::erasure_coding::bitrot_verify; use bytes::Bytes; @@ -105,6 +106,15 @@ pub struct LocalDisk { // pub format_data: Mutex>, // pub format_file_info: Mutex>, // pub format_last_check: Mutex>, + exit_signal: Option>, +} + +impl Drop for LocalDisk { + fn drop(&mut self) { + if let Some(exit_signal) = self.exit_signal.take() { + let _ = exit_signal.send(()); + } + } } impl Debug for LocalDisk { @@ -209,6 +219,7 @@ impl LocalDisk { // format_file_info: Mutex::new(format_meta), // format_data: Mutex::new(format_data), // format_last_check: Mutex::new(format_last_check), + exit_signal: None, }; let (info, _root) = get_disk_info(root).await?; disk.major = info.major; @@ -229,9 +240,61 @@ impl LocalDisk { disk.make_meta_volumes().await?; + let (exit_tx, exit_rx) = tokio::sync::broadcast::channel(1); + disk.exit_signal = Some(exit_tx); + + let root = disk.root.clone(); + tokio::spawn(Self::cleanup_deleted_objects_loop(root, exit_rx)); + Ok(disk) } + async fn cleanup_deleted_objects_loop(root: PathBuf, mut exit_rx: tokio::sync::broadcast::Receiver<()>) { + let mut interval = interval(Duration::from_secs(60 * 5)); + loop { + tokio::select! { + _ = interval.tick() => { + if let Err(err) = Self::cleanup_deleted_objects(root.clone()).await { + error!("cleanup_deleted_objects error: {:?}", err); + } + } + _ = exit_rx.recv() => { + info!("cleanup_deleted_objects_loop exit"); + break; + } + } + } + } + + async fn cleanup_deleted_objects(root: PathBuf) -> Result<()> { + let trash = path_join(&[root, RUSTFS_META_TMP_DELETED_BUCKET.into()]); + let mut entries = fs::read_dir(&trash).await?; + while let Some(entry) = entries.next_entry().await? { + let name = entry.file_name().to_string_lossy().to_string(); + if name.is_empty() || name == "." || name == ".." { + continue; + } + + let file_type = entry.file_type().await?; + + let path = path_join(&[trash.clone(), name.into()]); + + if file_type.is_dir() { + if let Err(e) = tokio::fs::remove_dir_all(path).await { + if e.kind() != ErrorKind::NotFound { + return Err(e.into()); + } + } + } else if let Err(e) = tokio::fs::remove_file(path).await { + if e.kind() != ErrorKind::NotFound { + return Err(e.into()); + } + } + } + + Ok(()) + } + fn is_valid_volname(volname: &str) -> bool { if volname.len() < 3 { return false; @@ -268,7 +331,14 @@ impl LocalDisk { let multipart = format!("{}/{}", RUSTFS_META_BUCKET, "multipart"); let config = format!("{}/{}", RUSTFS_META_BUCKET, "config"); let tmp = format!("{}/{}", RUSTFS_META_BUCKET, "tmp"); - let defaults = vec![buckets.as_str(), multipart.as_str(), config.as_str(), tmp.as_str()]; + + let defaults = vec![ + buckets.as_str(), + multipart.as_str(), + config.as_str(), + tmp.as_str(), + RUSTFS_META_TMP_DELETED_BUCKET, + ]; self.make_volumes(defaults).await } @@ -308,22 +378,22 @@ impl LocalDisk { #[allow(unreachable_code)] #[allow(unused_variables)] pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> { - if recursive { - remove_all_std(delete_path).map_err(to_volume_error)?; - } else { - remove_std(delete_path).map_err(to_file_error)?; - } + // if recursive { + // remove_all_std(delete_path).map_err(to_volume_error)?; + // } else { + // remove_std(delete_path).map_err(to_file_error)?; + // } - return Ok(()); + // return Ok(()); // TODO: 异步通知 检测硬盘空间 清空回收站 let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; - if let Some(parent) = trash_path.parent() { - if !parent.exists() { - fs::create_dir_all(parent).await?; - } - } + // if let Some(parent) = trash_path.parent() { + // if !parent.exists() { + // fs::create_dir_all(parent).await?; + // } + // } let err = if recursive { rename_all(delete_path, trash_path, self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?) diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index d21e88b5..6158c1d4 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -6,6 +6,7 @@ use std::{ use super::error::Result; use crate::disk::error_conv::to_file_error; use tokio::fs; +use tracing::warn; use super::error::DiskError; @@ -113,7 +114,7 @@ pub async fn rename_all( Ok(()) } -pub async fn reliable_rename( +async fn reliable_rename( src_file_path: impl AsRef, dst_file_path: impl AsRef, base_dir: impl AsRef, @@ -128,17 +129,21 @@ pub async fn reliable_rename( let mut i = 0; loop { if let Err(e) = super::fs::rename_std(src_file_path.as_ref(), dst_file_path.as_ref()) { - if e.kind() == io::ErrorKind::NotFound && i == 0 { + if e.kind() == io::ErrorKind::NotFound { + break; + } + + if i == 0 { i += 1; continue; } - // info!( - // "reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", - // src_file_path.as_ref(), - // dst_file_path.as_ref(), - // base_dir.as_ref(), - // e - // ); + warn!( + "reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", + src_file_path.as_ref(), + dst_file_path.as_ref(), + base_dir.as_ref(), + e + ); return Err(e); } diff --git a/ecstore/src/rebalance.rs b/ecstore/src/rebalance.rs index e68f448f..2c15e178 100644 --- a/ecstore/src/rebalance.rs +++ b/ecstore/src/rebalance.rs @@ -21,7 +21,7 @@ use time::OffsetDateTime; use tokio::io::{AsyncReadExt, BufReader}; use tokio::sync::broadcast::{self, Receiver as B_Receiver}; use tokio::time::{Duration, Instant}; -use tracing::{error, info, warn}; +use tracing::{error, info}; use uuid::Uuid; const REBAL_META_FMT: u16 = 1; // Replace with actual format value @@ -162,7 +162,7 @@ impl RebalanceMeta { pub async fn load_with_opts(&mut self, store: Arc, opts: ObjectOptions) -> Result<()> { let (data, _) = read_config_with_metadata(store, REBAL_META_NAME, &opts).await?; if data.is_empty() { - warn!("rebalanceMeta load_with_opts: no data"); + info!("rebalanceMeta load_with_opts: no data"); return Ok(()); } if data.len() <= 4 { @@ -184,7 +184,7 @@ impl RebalanceMeta { self.last_refreshed_at = Some(OffsetDateTime::now_utc()); - warn!("rebalanceMeta load_with_opts: loaded meta done"); + info!("rebalanceMeta load_with_opts: loaded meta done"); Ok(()) } @@ -194,7 +194,7 @@ impl RebalanceMeta { pub async fn save_with_opts(&self, store: Arc, opts: ObjectOptions) -> Result<()> { if self.pool_stats.is_empty() { - warn!("rebalanceMeta save_with_opts: no pool stats"); + info!("rebalanceMeta save_with_opts: no pool stats"); return Ok(()); } @@ -217,10 +217,10 @@ impl ECStore { #[tracing::instrument(skip_all)] pub async fn load_rebalance_meta(&self) -> Result<()> { let mut meta = RebalanceMeta::new(); - warn!("rebalanceMeta: store load rebalance meta"); + info!("rebalanceMeta: store load rebalance meta"); match meta.load(self.pools[0].clone()).await { Ok(_) => { - warn!("rebalanceMeta: rebalance meta loaded0"); + info!("rebalanceMeta: rebalance meta loaded0"); { let mut rebalance_meta = self.rebalance_meta.write().await; @@ -229,12 +229,12 @@ impl ECStore { drop(rebalance_meta); } - warn!("rebalanceMeta: rebalance meta loaded1"); + info!("rebalanceMeta: rebalance meta loaded1"); if let Err(err) = self.update_rebalance_stats().await { error!("Failed to update rebalance stats: {}", err); } else { - warn!("rebalanceMeta: rebalance meta loaded2"); + info!("rebalanceMeta: rebalance meta loaded2"); } } Err(err) => { @@ -243,7 +243,7 @@ impl ECStore { return Err(err); } - warn!("rebalanceMeta: not found, rebalance not started"); + info!("rebalanceMeta: not found, rebalance not started"); } } @@ -259,13 +259,13 @@ impl ECStore { rebalance_meta.as_ref().map(|v| v.pool_stats.clone()).unwrap_or_default() }; - warn!("update_rebalance_stats: pool_stats: {:?}", &pool_stats); + info!("update_rebalance_stats: pool_stats: {:?}", &pool_stats); for i in 0..self.pools.len() { if pool_stats.get(i).is_none() { - warn!("update_rebalance_stats: pool {} not found", i); + info!("update_rebalance_stats: pool {} not found", i); let mut rebalance_meta = self.rebalance_meta.write().await; - warn!("update_rebalance_stats: pool {} not found, add", i); + info!("update_rebalance_stats: pool {} not found, add", i); if let Some(meta) = rebalance_meta.as_mut() { meta.pool_stats.push(RebalanceStats::default()); } @@ -275,7 +275,7 @@ impl ECStore { } if ok { - warn!("update_rebalance_stats: save rebalance meta"); + info!("update_rebalance_stats: save rebalance meta"); let rebalance_meta = self.rebalance_meta.read().await; if let Some(meta) = rebalance_meta.as_ref() { @@ -296,7 +296,7 @@ impl ECStore { #[tracing::instrument(skip(self))] pub async fn init_rebalance_meta(&self, bucktes: Vec) -> Result { - warn!("init_rebalance_meta: start rebalance"); + info!("init_rebalance_meta: start rebalance"); let si = self.storage_info().await; let mut disk_stats = vec![DiskStat::default(); self.pools.len()]; @@ -351,7 +351,7 @@ impl ECStore { meta.save(self.pools[0].clone()).await?; - warn!("init_rebalance_meta: rebalance meta saved"); + info!("init_rebalance_meta: rebalance meta saved"); let id = meta.id.clone(); @@ -378,26 +378,26 @@ impl ECStore { #[tracing::instrument(skip(self))] pub async fn next_rebal_bucket(&self, pool_index: usize) -> Result> { - warn!("next_rebal_bucket: pool_index: {}", pool_index); + info!("next_rebal_bucket: pool_index: {}", pool_index); let rebalance_meta = self.rebalance_meta.read().await; - warn!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta); + info!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta); if let Some(meta) = rebalance_meta.as_ref() { if let Some(pool_stat) = meta.pool_stats.get(pool_index) { if pool_stat.info.status == RebalStatus::Completed || !pool_stat.participating { - warn!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index); + info!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index); return Ok(None); } if pool_stat.buckets.is_empty() { - warn!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index); + info!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index); return Ok(None); } - warn!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]); + info!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]); return Ok(Some(pool_stat.buckets[0].clone())); } } - warn!("next_rebal_bucket: pool_index: {} None", pool_index); + info!("next_rebal_bucket: pool_index: {} None", pool_index); Ok(None) } @@ -406,7 +406,7 @@ impl ECStore { let mut rebalance_meta = self.rebalance_meta.write().await; if let Some(meta) = rebalance_meta.as_mut() { if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { - warn!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets); + info!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets); // 使用 retain 来过滤掉要删除的 bucket let mut found = false; @@ -421,14 +421,14 @@ impl ECStore { }); if found { - warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket); + info!("bucket_rebalance_done: bucket {} rebalanced", &bucket); return Ok(()); } else { - warn!("bucket_rebalance_done: bucket {} not found", bucket); + info!("bucket_rebalance_done: bucket {} not found", bucket); } } } - warn!("bucket_rebalance_done: bucket {} not found", bucket); + info!("bucket_rebalance_done: bucket {} not found", bucket); Ok(()) } @@ -436,12 +436,12 @@ impl ECStore { let rebalance_meta = self.rebalance_meta.read().await; if let Some(ref meta) = *rebalance_meta { if meta.stopped_at.is_some() { - warn!("is_rebalance_started: rebalance stopped"); + info!("is_rebalance_started: rebalance stopped"); return false; } meta.pool_stats.iter().enumerate().for_each(|(i, v)| { - warn!( + info!( "is_rebalance_started: pool_index: {}, participating: {:?}, status: {:?}", i, v.participating, v.info.status ); @@ -452,12 +452,12 @@ impl ECStore { .iter() .any(|v| v.participating && v.info.status != RebalStatus::Completed) { - warn!("is_rebalance_started: rebalance started"); + info!("is_rebalance_started: rebalance started"); return true; } } - warn!("is_rebalance_started: rebalance not started"); + info!("is_rebalance_started: rebalance not started"); false } @@ -490,7 +490,7 @@ impl ECStore { #[tracing::instrument(skip_all)] pub async fn start_rebalance(self: &Arc) { - warn!("start_rebalance: start rebalance"); + info!("start_rebalance: start rebalance"); // let rebalance_meta = self.rebalance_meta.read().await; let (tx, rx) = broadcast::channel::(1); @@ -501,7 +501,7 @@ impl ECStore { if let Some(meta) = rebalance_meta.as_mut() { meta.cancel = Some(tx) } else { - warn!("start_rebalance: rebalance_meta is None exit"); + info!("start_rebalance: rebalance_meta is None exit"); return; } @@ -517,36 +517,36 @@ impl ECStore { let mut participants = vec![false; meta.pool_stats.len()]; for (i, pool_stat) in meta.pool_stats.iter().enumerate() { - warn!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status); + info!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status); if pool_stat.info.status != RebalStatus::Started { - warn!("start_rebalance: pool {} not started, skipping", i); + info!("start_rebalance: pool {} not started, skipping", i); continue; } - warn!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating); + info!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating); participants[i] = pool_stat.participating; } participants } else { - warn!("start_rebalance:2 rebalance_meta is None exit"); + info!("start_rebalance:2 rebalance_meta is None exit"); Vec::new() } }; for (idx, participating) in participants.iter().enumerate() { if !*participating { - warn!("start_rebalance: pool {} is not participating, skipping", idx); + info!("start_rebalance: pool {} is not participating, skipping", idx); continue; } if !get_global_endpoints().as_ref().get(idx).is_some_and(|v| { - warn!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints); + info!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints); v.endpoints.as_ref().first().is_some_and(|e| { - warn!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local); + info!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local); e.is_local }) }) { - warn!("start_rebalance: pool {} is not local, skipping", idx); + info!("start_rebalance: pool {} is not local, skipping", idx); continue; } @@ -562,7 +562,7 @@ impl ECStore { }); } - warn!("start_rebalance: rebalance started done"); + info!("start_rebalance: rebalance started done"); } #[tracing::instrument(skip(self, rx))] @@ -585,11 +585,11 @@ impl ECStore { let state = match result { Ok(_) => { - warn!("rebalance_buckets: completed"); + info!("rebalance_buckets: completed"); msg = format!("Rebalance completed at {:?}", now); RebalStatus::Completed}, Err(err) => { - warn!("rebalance_buckets: error: {:?}", err); + info!("rebalance_buckets: error: {:?}", err); // TODO: check stop if err.to_string().contains("canceled") { msg = format!("Rebalance stopped at {:?}", now); @@ -602,11 +602,11 @@ impl ECStore { }; { - warn!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state); + info!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state); let mut rebalance_meta = store.rebalance_meta.write().await; if let Some(rbm) = rebalance_meta.as_mut() { - warn!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state); + info!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state); rbm.pool_stats[pool_index].info.status = state; rbm.pool_stats[pool_index].info.end_time = Some(now); } @@ -623,11 +623,11 @@ impl ECStore { if let Err(err) = store.save_rebalance_stats(pool_index, RebalSaveOpt::Stats).await { error!("{} err: {:?}", msg, err); } else { - warn!(msg); + info!(msg); } if quit { - warn!("{}: exiting save_task", msg); + info!("{}: exiting save_task", msg); return; } @@ -635,21 +635,21 @@ impl ECStore { } }); - warn!("Pool {} rebalancing is started", pool_index); + info!("Pool {} rebalancing is started", pool_index); loop { if let Ok(true) = rx.try_recv() { - warn!("Pool {} rebalancing is stopped", pool_index); + info!("Pool {} rebalancing is stopped", pool_index); done_tx.send(Err(Error::other("rebalance stopped canceled"))).await.ok(); break; } if let Some(bucket) = self.next_rebal_bucket(pool_index).await? { - warn!("Rebalancing bucket: start {}", bucket); + info!("Rebalancing bucket: start {}", bucket); if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await { if err.to_string().contains("not initialized") { - warn!("rebalance_bucket: rebalance not initialized, continue"); + info!("rebalance_bucket: rebalance not initialized, continue"); continue; } error!("Error rebalancing bucket {}: {:?}", bucket, err); @@ -657,19 +657,19 @@ impl ECStore { break; } - warn!("Rebalance bucket: done {} ", bucket); + info!("Rebalance bucket: done {} ", bucket); self.bucket_rebalance_done(pool_index, bucket).await?; } else { - warn!("Rebalance bucket: no bucket to rebalance"); + info!("Rebalance bucket: no bucket to rebalance"); break; } } - warn!("Pool {} rebalancing is done", pool_index); + info!("Pool {} rebalancing is done", pool_index); done_tx.send(Ok(())).await.ok(); save_task.await.ok(); - warn!("Pool {} rebalancing is done2", pool_index); + info!("Pool {} rebalancing is done2", pool_index); Ok(()) } @@ -680,7 +680,7 @@ impl ECStore { if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { // Check if the pool's rebalance status is already completed if pool_stat.info.status == RebalStatus::Completed { - warn!("check_if_rebalance_done: pool {} is already completed", pool_index); + info!("check_if_rebalance_done: pool {} is already completed", pool_index); return true; } @@ -691,7 +691,7 @@ impl ECStore { if (pfi - meta.percent_free_goal).abs() <= 0.05 { pool_stat.info.status = RebalStatus::Completed; pool_stat.info.end_time = Some(OffsetDateTime::now_utc()); - warn!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi); + info!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi); return true; } } @@ -710,7 +710,7 @@ impl ECStore { set: Arc, // wk: Arc, ) { - warn!("rebalance_entry: start rebalance_entry"); + info!("rebalance_entry: start rebalance_entry"); // defer!(|| async { // warn!("rebalance_entry: defer give worker start"); @@ -719,12 +719,12 @@ impl ECStore { // }); if entry.is_dir() { - warn!("rebalance_entry: entry is dir, skipping"); + info!("rebalance_entry: entry is dir, skipping"); return; } if self.check_if_rebalance_done(pool_index).await { - warn!("rebalance_entry: rebalance done, skipping pool {}", pool_index); + info!("rebalance_entry: rebalance done, skipping pool {}", pool_index); return; } @@ -732,7 +732,7 @@ impl ECStore { Ok(fivs) => fivs, Err(err) => { error!("rebalance_entry Error getting file info versions: {}", err); - warn!("rebalance_entry: Error getting file info versions, skipping"); + info!("rebalance_entry: Error getting file info versions, skipping"); return; } }; @@ -743,7 +743,7 @@ impl ECStore { let expired: usize = 0; for version in fivs.versions.iter() { if version.is_remote() { - warn!("rebalance_entry Entry {} is remote, skipping", version.name); + info!("rebalance_entry Entry {} is remote, skipping", version.name); continue; } // TODO: filterLifecycle @@ -751,7 +751,7 @@ impl ECStore { let remaining_versions = fivs.versions.len() - expired; if version.deleted && remaining_versions == 1 { rebalanced += 1; - warn!("rebalance_entry Entry {} is deleted and last version, skipping", version.name); + info!("rebalance_entry Entry {} is deleted and last version, skipping", version.name); continue; } let version_id = version.version_id.map(|v| v.to_string()); @@ -802,7 +802,7 @@ impl ECStore { } for _i in 0..3 { - warn!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name); + info!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name); let rd = match set .get_object_reader( bucket.as_str(), @@ -821,7 +821,7 @@ impl ECStore { Err(err) => { if is_err_object_not_found(&err) || is_err_version_not_found(&err) { ignore = true; - warn!( + info!( "rebalance_entry: get_object_reader, bucket: {}, version: {}, ignore", &bucket, &version.name ); @@ -837,7 +837,7 @@ impl ECStore { if let Err(err) = self.clone().rebalance_object(pool_index, bucket.clone(), rd).await { if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { ignore = true; - warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); break; } @@ -852,7 +852,7 @@ impl ECStore { } if ignore { - warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); continue; } @@ -884,7 +884,7 @@ impl ECStore { { error!("rebalance_entry: delete_object err {:?}", &err); } else { - warn!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name); + info!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name); } } } @@ -1022,7 +1022,7 @@ impl ECStore { #[tracing::instrument(skip(self, rx))] async fn rebalance_bucket(self: &Arc, rx: B_Receiver, bucket: String, pool_index: usize) -> Result<()> { // Placeholder for actual bucket rebalance logic - warn!("Rebalancing bucket {} in pool {}", bucket, pool_index); + info!("Rebalancing bucket {} in pool {}", bucket, pool_index); // TODO: other config // if bucket != RUSTFS_META_BUCKET{ @@ -1047,12 +1047,12 @@ impl ECStore { // let wk = wk.clone(); let set = set.clone(); Box::pin(async move { - warn!("rebalance_entry: rebalance_entry spawn start"); + info!("rebalance_entry: rebalance_entry spawn start"); // wk.take().await; // tokio::spawn(async move { - warn!("rebalance_entry: rebalance_entry spawn start2"); + info!("rebalance_entry: rebalance_entry spawn start2"); this.rebalance_entry(bucket, pool_index, entry, set).await; - warn!("rebalance_entry: rebalance_entry spawn done"); + info!("rebalance_entry: rebalance_entry spawn done"); // }); }) } @@ -1079,7 +1079,7 @@ impl ECStore { for job in jobs { job.await.unwrap(); } - warn!("rebalance_bucket: rebalance_bucket done"); + info!("rebalance_bucket: rebalance_bucket done"); Ok(()) } @@ -1089,7 +1089,7 @@ impl ECStore { let mut meta = RebalanceMeta::new(); if let Err(err) = meta.load(self.pools[0].clone()).await { if err != Error::ConfigNotFound { - warn!("save_rebalance_stats: load err: {:?}", err); + info!("save_rebalance_stats: load err: {:?}", err); return Err(err); } } @@ -1117,7 +1117,7 @@ impl ECStore { *rebalance_meta = Some(meta.clone()); } - warn!( + info!( "save_rebalance_stats: save rebalance meta, pool_idx: {}, opt: {:?}, meta: {:?}", pool_idx, opt, meta ); @@ -1135,15 +1135,15 @@ impl SetDisks { bucket: String, cb: ListCallback, ) -> Result<()> { - warn!("list_objects_to_rebalance: start list_objects_to_rebalance"); + info!("list_objects_to_rebalance: start list_objects_to_rebalance"); // Placeholder for actual object listing logic let (disks, _) = self.get_online_disks_with_healing(false).await; if disks.is_empty() { - warn!("list_objects_to_rebalance: no disk available"); + info!("list_objects_to_rebalance: no disk available"); return Err(Error::other("errNoDiskAvailable")); } - warn!("list_objects_to_rebalance: get online disks with healing"); + info!("list_objects_to_rebalance: get online disks with healing"); let listing_quorum = self.set_drive_count.div_ceil(2); let resolver = MetadataResolutionParams { @@ -1162,7 +1162,7 @@ impl SetDisks { recursice: true, min_disks: listing_quorum, agreed: Some(Box::new(move |entry: MetaCacheEntry| { - warn!("list_objects_to_rebalance: agreed: {:?}", &entry.name); + info!("list_objects_to_rebalance: agreed: {:?}", &entry.name); Box::pin(cb1(entry)) })), partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { @@ -1172,11 +1172,11 @@ impl SetDisks { match entries.resolve(resolver) { Some(entry) => { - warn!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name); + info!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name); Box::pin(async move { cb(entry).await }) } None => { - warn!("list_objects_to_rebalance: list_objects_to_decommission get none"); + info!("list_objects_to_rebalance: list_objects_to_decommission get none"); Box::pin(async {}) } } @@ -1186,7 +1186,7 @@ impl SetDisks { ) .await?; - warn!("list_objects_to_rebalance: list_objects_to_rebalance done"); + info!("list_objects_to_rebalance: list_objects_to_rebalance done"); Ok(()) } } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 305b52fb..89c14b56 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -81,7 +81,7 @@ use rustfs_utils::{ use s3s::header::X_AMZ_RESTORE; use sha2::{Digest, Sha256}; use std::hash::Hash; -use std::mem; +use std::mem::{self}; use std::time::SystemTime; use std::{ collections::{HashMap, HashSet}, @@ -107,12 +107,12 @@ use workers::workers::Workers; pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SetDisks { pub lockers: Vec, pub locker_owner: String, pub ns_mutex: Arc>, - pub disks: RwLock>>, + pub disks: Arc>>>, pub set_endpoints: Vec, pub set_drive_count: usize, pub default_parity_count: usize, @@ -122,12 +122,54 @@ pub struct SetDisks { } impl SetDisks { + #[allow(clippy::too_many_arguments)] + pub async fn new( + lockers: Vec, + locker_owner: String, + ns_mutex: Arc>, + disks: Arc>>>, + set_drive_count: usize, + default_parity_count: usize, + set_index: usize, + pool_index: usize, + set_endpoints: Vec, + format: FormatV3, + ) -> Arc { + Arc::new(SetDisks { + lockers, + locker_owner, + ns_mutex, + disks, + set_drive_count, + default_parity_count, + set_index, + pool_index, + format, + set_endpoints, + }) + } async fn get_disks_internal(&self) -> Vec> { let rl = self.disks.read().await; rl.clone() } + pub async fn get_local_disks(&self) -> Vec> { + let rl = self.disks.read().await; + + let mut disks: Vec> = rl + .clone() + .into_iter() + .filter(|v| v.as_ref().is_some_and(|d| d.is_local())) + .collect(); + + let mut rng = rand::rng(); + + disks.shuffle(&mut rng); + + disks + } + async fn get_online_disks(&self) -> Vec> { let mut disks = self.get_disks_internal().await; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 71d7382b..2b0f31be 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -35,8 +35,8 @@ use tokio::sync::RwLock; use uuid::Uuid; use crate::heal::heal_ops::HealSequence; +use tokio::sync::broadcast::{Receiver, Sender}; use tokio::time::Duration; -use tokio_util::sync::CancellationToken; use tracing::warn; use tracing::{error, info}; @@ -55,7 +55,15 @@ pub struct Sets { pub set_drive_count: usize, pub default_parity_count: usize, pub distribution_algo: DistributionAlgoVersion, - ctx: CancellationToken, + exit_signal: Option>, +} + +impl Drop for Sets { + fn drop(&mut self) { + if let Some(exit_signal) = self.exit_signal.take() { + let _ = exit_signal.send(()); + } + } } impl Sets { @@ -144,22 +152,25 @@ impl Sets { // warn!("sets new set_drive {:?}", &set_drive); - let set_disks = SetDisks { - lockers: locker.clone(), - locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(), - ns_mutex: Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))), - disks: RwLock::new(set_drive), + let set_disks = SetDisks::new( + locker.clone(), + GLOBAL_Local_Node_Name.read().await.to_string(), + Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))), + Arc::new(RwLock::new(set_drive)), set_drive_count, - default_parity_count: partiy_count, - set_index: i, - pool_index: pool_idx, + partiy_count, + i, + pool_idx, set_endpoints, - format: fm.clone(), - }; + fm.clone(), + ) + .await; - disk_set.push(Arc::new(set_disks)); + disk_set.push(set_disks); } + let (tx, rx) = tokio::sync::broadcast::channel(1); + let sets = Arc::new(Self { id: fm.id, // sets: todo!(), @@ -173,12 +184,17 @@ impl Sets { set_drive_count, default_parity_count: partiy_count, distribution_algo: fm.erasure.distribution_algo.clone(), - ctx: CancellationToken::new(), + exit_signal: Some(tx), }); let asets = sets.clone(); - tokio::spawn(async move { asets.monitor_and_connect_endpoints().await }); + let rx1 = rx.resubscribe(); + tokio::spawn(async move { asets.monitor_and_connect_endpoints(rx1).await }); + + // let sets2 = sets.clone(); + // let rx2 = rx.resubscribe(); + // tokio::spawn(async move { sets2.cleanup_deleted_objects_loop(rx2).await }); Ok(sets) } @@ -186,7 +202,38 @@ impl Sets { pub fn set_drive_count(&self) -> usize { self.set_drive_count } - pub async fn monitor_and_connect_endpoints(&self) { + + // pub async fn cleanup_deleted_objects_loop(self: Arc, mut rx: Receiver<()>) { + // tokio::time::sleep(Duration::from_secs(5)).await; + + // info!("start cleanup_deleted_objects_loop"); + + // // TODO: config interval + // let mut interval = tokio::time::interval(Duration::from_secs(15 * 3)); + // loop { + // tokio::select! { + // _= interval.tick()=>{ + + // info!("cleanup_deleted_objects_loop tick"); + + // for set in self.disk_set.iter() { + // set.clone().cleanup_deleted_objects().await; + // } + + // interval.reset(); + // }, + + // _ = rx.recv() => { + // warn!("cleanup_deleted_objects_loop ctx cancelled"); + // break; + // } + // } + // } + + // warn!("cleanup_deleted_objects_loop exit"); + // } + + pub async fn monitor_and_connect_endpoints(&self, mut rx: Receiver<()>) { tokio::time::sleep(Duration::from_secs(5)).await; info!("start monitor_and_connect_endpoints"); @@ -195,7 +242,6 @@ impl Sets { // TODO: config interval let mut interval = tokio::time::interval(Duration::from_secs(15 * 3)); - let cloned_token = self.ctx.clone(); loop { tokio::select! { _= interval.tick()=>{ @@ -205,7 +251,7 @@ impl Sets { interval.reset(); }, - _ = cloned_token.cancelled() => { + _ = rx.recv() => { warn!("monitor_and_connect_endpoints ctx cancelled"); break; } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index ebd4706f..874db273 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -134,13 +134,11 @@ impl GetObjectReader { return Err(Error::other(format!("invalid decompressed size {}", actual_size))); }; - warn!("actual_size: {}", actual_size); let dec_reader = LimitReader::new(dec_reader, actual_size); let mut oi = oi.clone(); oi.size = dec_length; - warn!("oi.size: {}, off: {}, length: {}", oi.size, off, length); return Ok(( GetObjectReader { stream: Box::new(dec_reader), diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index 97c23cb0..31fe9d43 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -15,7 +15,7 @@ use crate::{ use futures::future::join_all; use std::collections::{HashMap, hash_map::Entry}; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use uuid::Uuid; pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { @@ -52,10 +52,9 @@ pub async fn connect_load_init_formats( set_drive_count: usize, deployment_id: Option, ) -> Result { - warn!("connect_load_init_formats first_disk: {}", first_disk); let (formats, errs) = load_format_erasure_all(disks, false).await; - debug!("load_format_erasure_all errs {:?}", &errs); + // debug!("load_format_erasure_all errs {:?}", &errs); check_disk_fatal_errs(&errs)?; @@ -63,14 +62,14 @@ pub async fn connect_load_init_formats( if first_disk && should_init_erasure_disks(&errs) { // UnformattedDisk, not format file create - warn!("first_disk && should_init_erasure_disks"); + info!("first_disk && should_init_erasure_disks"); // new format and save let fm = init_format_erasure(disks, set_count, set_drive_count, deployment_id).await?; return Ok(fm); } - warn!( + info!( "first_disk: {}, should_init_erasure_disks: {}", first_disk, should_init_erasure_disks(&errs) diff --git a/ecstore/src/tier/tier.rs b/ecstore/src/tier/tier.rs index 9c929ef7..e9ed6861 100644 --- a/ecstore/src/tier/tier.rs +++ b/ecstore/src/tier/tier.rs @@ -458,5 +458,5 @@ async fn load_tier_config(api: Arc) -> std::result::Result bool { - matches!(err, StorageError::ObjectNotFound(_, _)) + matches!(err, StorageError::ObjectNotFound(_, _)) || err == &StorageError::ConfigNotFound }