moved to the trash and automatically cleaned up in the background

This commit is contained in:
weisd
2025-06-20 17:57:05 +08:00
parent 37b4ab935d
commit 7a7ca6e154
8 changed files with 292 additions and 132 deletions

View File

@@ -18,7 +18,7 @@ use crate::disk::fs::{
use crate::disk::os::{check_path_length, is_empty_dir};
use crate::disk::{
CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, CHECK_PART_VOLUME_NOT_FOUND,
FileReader, conv_part_err_to_int,
FileReader, RUSTFS_META_TMP_DELETED_BUCKET, conv_part_err_to_int,
};
use crate::disk::{FileWriter, STORAGE_FORMAT_FILE};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
@@ -36,6 +36,7 @@ use rustfs_utils::path::{
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix,
path_join, path_join_buf,
};
use tokio::time::interval;
use crate::erasure_coding::bitrot_verify;
use bytes::Bytes;
@@ -105,6 +106,15 @@ pub struct LocalDisk {
// pub format_data: Mutex<Vec<u8>>,
// pub format_file_info: Mutex<Option<Metadata>>,
// pub format_last_check: Mutex<Option<OffsetDateTime>>,
exit_signal: Option<tokio::sync::broadcast::Sender<()>>,
}
impl Drop for LocalDisk {
fn drop(&mut self) {
if let Some(exit_signal) = self.exit_signal.take() {
let _ = exit_signal.send(());
}
}
}
impl Debug for LocalDisk {
@@ -209,6 +219,7 @@ impl LocalDisk {
// format_file_info: Mutex::new(format_meta),
// format_data: Mutex::new(format_data),
// format_last_check: Mutex::new(format_last_check),
exit_signal: None,
};
let (info, _root) = get_disk_info(root).await?;
disk.major = info.major;
@@ -229,9 +240,61 @@ impl LocalDisk {
disk.make_meta_volumes().await?;
let (exit_tx, exit_rx) = tokio::sync::broadcast::channel(1);
disk.exit_signal = Some(exit_tx);
let root = disk.root.clone();
tokio::spawn(Self::cleanup_deleted_objects_loop(root, exit_rx));
Ok(disk)
}
async fn cleanup_deleted_objects_loop(root: PathBuf, mut exit_rx: tokio::sync::broadcast::Receiver<()>) {
let mut interval = interval(Duration::from_secs(60 * 5));
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(err) = Self::cleanup_deleted_objects(root.clone()).await {
error!("cleanup_deleted_objects error: {:?}", err);
}
}
_ = exit_rx.recv() => {
info!("cleanup_deleted_objects_loop exit");
break;
}
}
}
}
async fn cleanup_deleted_objects(root: PathBuf) -> Result<()> {
let trash = path_join(&[root, RUSTFS_META_TMP_DELETED_BUCKET.into()]);
let mut entries = fs::read_dir(&trash).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name().to_string_lossy().to_string();
if name.is_empty() || name == "." || name == ".." {
continue;
}
let file_type = entry.file_type().await?;
let path = path_join(&[trash.clone(), name.into()]);
if file_type.is_dir() {
if let Err(e) = tokio::fs::remove_dir_all(path).await {
if e.kind() != ErrorKind::NotFound {
return Err(e.into());
}
}
} else if let Err(e) = tokio::fs::remove_file(path).await {
if e.kind() != ErrorKind::NotFound {
return Err(e.into());
}
}
}
Ok(())
}
fn is_valid_volname(volname: &str) -> bool {
if volname.len() < 3 {
return false;
@@ -268,7 +331,14 @@ impl LocalDisk {
let multipart = format!("{}/{}", RUSTFS_META_BUCKET, "multipart");
let config = format!("{}/{}", RUSTFS_META_BUCKET, "config");
let tmp = format!("{}/{}", RUSTFS_META_BUCKET, "tmp");
let defaults = vec![buckets.as_str(), multipart.as_str(), config.as_str(), tmp.as_str()];
let defaults = vec![
buckets.as_str(),
multipart.as_str(),
config.as_str(),
tmp.as_str(),
RUSTFS_META_TMP_DELETED_BUCKET,
];
self.make_volumes(defaults).await
}
@@ -308,22 +378,22 @@ impl LocalDisk {
#[allow(unreachable_code)]
#[allow(unused_variables)]
pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> {
if recursive {
remove_all_std(delete_path).map_err(to_volume_error)?;
} else {
remove_std(delete_path).map_err(to_file_error)?;
}
// if recursive {
// remove_all_std(delete_path).map_err(to_volume_error)?;
// } else {
// remove_std(delete_path).map_err(to_file_error)?;
// }
return Ok(());
// return Ok(());
// TODO: 异步通知 检测硬盘空间 清空回收站
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
if let Some(parent) = trash_path.parent() {
if !parent.exists() {
fs::create_dir_all(parent).await?;
}
}
// if let Some(parent) = trash_path.parent() {
// if !parent.exists() {
// fs::create_dir_all(parent).await?;
// }
// }
let err = if recursive {
rename_all(delete_path, trash_path, self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?)

View File

@@ -6,6 +6,7 @@ use std::{
use super::error::Result;
use crate::disk::error_conv::to_file_error;
use tokio::fs;
use tracing::warn;
use super::error::DiskError;
@@ -113,7 +114,7 @@ pub async fn rename_all(
Ok(())
}
pub async fn reliable_rename(
async fn reliable_rename(
src_file_path: impl AsRef<Path>,
dst_file_path: impl AsRef<Path>,
base_dir: impl AsRef<Path>,
@@ -128,17 +129,21 @@ pub async fn reliable_rename(
let mut i = 0;
loop {
if let Err(e) = super::fs::rename_std(src_file_path.as_ref(), dst_file_path.as_ref()) {
if e.kind() == io::ErrorKind::NotFound && i == 0 {
if e.kind() == io::ErrorKind::NotFound {
break;
}
if i == 0 {
i += 1;
continue;
}
// info!(
// "reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
// src_file_path.as_ref(),
// dst_file_path.as_ref(),
// base_dir.as_ref(),
// e
// );
warn!(
"reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
e
);
return Err(e);
}

View File

@@ -21,7 +21,7 @@ use time::OffsetDateTime;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
use tokio::time::{Duration, Instant};
use tracing::{error, info, warn};
use tracing::{error, info};
use uuid::Uuid;
const REBAL_META_FMT: u16 = 1; // Replace with actual format value
@@ -162,7 +162,7 @@ impl RebalanceMeta {
pub async fn load_with_opts<S: StorageAPI>(&mut self, store: Arc<S>, opts: ObjectOptions) -> Result<()> {
let (data, _) = read_config_with_metadata(store, REBAL_META_NAME, &opts).await?;
if data.is_empty() {
warn!("rebalanceMeta load_with_opts: no data");
info!("rebalanceMeta load_with_opts: no data");
return Ok(());
}
if data.len() <= 4 {
@@ -184,7 +184,7 @@ impl RebalanceMeta {
self.last_refreshed_at = Some(OffsetDateTime::now_utc());
warn!("rebalanceMeta load_with_opts: loaded meta done");
info!("rebalanceMeta load_with_opts: loaded meta done");
Ok(())
}
@@ -194,7 +194,7 @@ impl RebalanceMeta {
pub async fn save_with_opts<S: StorageAPI>(&self, store: Arc<S>, opts: ObjectOptions) -> Result<()> {
if self.pool_stats.is_empty() {
warn!("rebalanceMeta save_with_opts: no pool stats");
info!("rebalanceMeta save_with_opts: no pool stats");
return Ok(());
}
@@ -217,10 +217,10 @@ impl ECStore {
#[tracing::instrument(skip_all)]
pub async fn load_rebalance_meta(&self) -> Result<()> {
let mut meta = RebalanceMeta::new();
warn!("rebalanceMeta: store load rebalance meta");
info!("rebalanceMeta: store load rebalance meta");
match meta.load(self.pools[0].clone()).await {
Ok(_) => {
warn!("rebalanceMeta: rebalance meta loaded0");
info!("rebalanceMeta: rebalance meta loaded0");
{
let mut rebalance_meta = self.rebalance_meta.write().await;
@@ -229,12 +229,12 @@ impl ECStore {
drop(rebalance_meta);
}
warn!("rebalanceMeta: rebalance meta loaded1");
info!("rebalanceMeta: rebalance meta loaded1");
if let Err(err) = self.update_rebalance_stats().await {
error!("Failed to update rebalance stats: {}", err);
} else {
warn!("rebalanceMeta: rebalance meta loaded2");
info!("rebalanceMeta: rebalance meta loaded2");
}
}
Err(err) => {
@@ -243,7 +243,7 @@ impl ECStore {
return Err(err);
}
warn!("rebalanceMeta: not found, rebalance not started");
info!("rebalanceMeta: not found, rebalance not started");
}
}
@@ -259,13 +259,13 @@ impl ECStore {
rebalance_meta.as_ref().map(|v| v.pool_stats.clone()).unwrap_or_default()
};
warn!("update_rebalance_stats: pool_stats: {:?}", &pool_stats);
info!("update_rebalance_stats: pool_stats: {:?}", &pool_stats);
for i in 0..self.pools.len() {
if pool_stats.get(i).is_none() {
warn!("update_rebalance_stats: pool {} not found", i);
info!("update_rebalance_stats: pool {} not found", i);
let mut rebalance_meta = self.rebalance_meta.write().await;
warn!("update_rebalance_stats: pool {} not found, add", i);
info!("update_rebalance_stats: pool {} not found, add", i);
if let Some(meta) = rebalance_meta.as_mut() {
meta.pool_stats.push(RebalanceStats::default());
}
@@ -275,7 +275,7 @@ impl ECStore {
}
if ok {
warn!("update_rebalance_stats: save rebalance meta");
info!("update_rebalance_stats: save rebalance meta");
let rebalance_meta = self.rebalance_meta.read().await;
if let Some(meta) = rebalance_meta.as_ref() {
@@ -296,7 +296,7 @@ impl ECStore {
#[tracing::instrument(skip(self))]
pub async fn init_rebalance_meta(&self, bucktes: Vec<String>) -> Result<String> {
warn!("init_rebalance_meta: start rebalance");
info!("init_rebalance_meta: start rebalance");
let si = self.storage_info().await;
let mut disk_stats = vec![DiskStat::default(); self.pools.len()];
@@ -351,7 +351,7 @@ impl ECStore {
meta.save(self.pools[0].clone()).await?;
warn!("init_rebalance_meta: rebalance meta saved");
info!("init_rebalance_meta: rebalance meta saved");
let id = meta.id.clone();
@@ -378,26 +378,26 @@ impl ECStore {
#[tracing::instrument(skip(self))]
pub async fn next_rebal_bucket(&self, pool_index: usize) -> Result<Option<String>> {
warn!("next_rebal_bucket: pool_index: {}", pool_index);
info!("next_rebal_bucket: pool_index: {}", pool_index);
let rebalance_meta = self.rebalance_meta.read().await;
warn!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta);
info!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta);
if let Some(meta) = rebalance_meta.as_ref() {
if let Some(pool_stat) = meta.pool_stats.get(pool_index) {
if pool_stat.info.status == RebalStatus::Completed || !pool_stat.participating {
warn!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index);
info!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index);
return Ok(None);
}
if pool_stat.buckets.is_empty() {
warn!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index);
info!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index);
return Ok(None);
}
warn!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]);
info!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]);
return Ok(Some(pool_stat.buckets[0].clone()));
}
}
warn!("next_rebal_bucket: pool_index: {} None", pool_index);
info!("next_rebal_bucket: pool_index: {} None", pool_index);
Ok(None)
}
@@ -406,7 +406,7 @@ impl ECStore {
let mut rebalance_meta = self.rebalance_meta.write().await;
if let Some(meta) = rebalance_meta.as_mut() {
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) {
warn!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets);
info!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets);
// 使用 retain 来过滤掉要删除的 bucket
let mut found = false;
@@ -421,14 +421,14 @@ impl ECStore {
});
if found {
warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket);
info!("bucket_rebalance_done: bucket {} rebalanced", &bucket);
return Ok(());
} else {
warn!("bucket_rebalance_done: bucket {} not found", bucket);
info!("bucket_rebalance_done: bucket {} not found", bucket);
}
}
}
warn!("bucket_rebalance_done: bucket {} not found", bucket);
info!("bucket_rebalance_done: bucket {} not found", bucket);
Ok(())
}
@@ -436,12 +436,12 @@ impl ECStore {
let rebalance_meta = self.rebalance_meta.read().await;
if let Some(ref meta) = *rebalance_meta {
if meta.stopped_at.is_some() {
warn!("is_rebalance_started: rebalance stopped");
info!("is_rebalance_started: rebalance stopped");
return false;
}
meta.pool_stats.iter().enumerate().for_each(|(i, v)| {
warn!(
info!(
"is_rebalance_started: pool_index: {}, participating: {:?}, status: {:?}",
i, v.participating, v.info.status
);
@@ -452,12 +452,12 @@ impl ECStore {
.iter()
.any(|v| v.participating && v.info.status != RebalStatus::Completed)
{
warn!("is_rebalance_started: rebalance started");
info!("is_rebalance_started: rebalance started");
return true;
}
}
warn!("is_rebalance_started: rebalance not started");
info!("is_rebalance_started: rebalance not started");
false
}
@@ -490,7 +490,7 @@ impl ECStore {
#[tracing::instrument(skip_all)]
pub async fn start_rebalance(self: &Arc<Self>) {
warn!("start_rebalance: start rebalance");
info!("start_rebalance: start rebalance");
// let rebalance_meta = self.rebalance_meta.read().await;
let (tx, rx) = broadcast::channel::<bool>(1);
@@ -501,7 +501,7 @@ impl ECStore {
if let Some(meta) = rebalance_meta.as_mut() {
meta.cancel = Some(tx)
} else {
warn!("start_rebalance: rebalance_meta is None exit");
info!("start_rebalance: rebalance_meta is None exit");
return;
}
@@ -517,36 +517,36 @@ impl ECStore {
let mut participants = vec![false; meta.pool_stats.len()];
for (i, pool_stat) in meta.pool_stats.iter().enumerate() {
warn!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status);
info!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status);
if pool_stat.info.status != RebalStatus::Started {
warn!("start_rebalance: pool {} not started, skipping", i);
info!("start_rebalance: pool {} not started, skipping", i);
continue;
}
warn!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating);
info!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating);
participants[i] = pool_stat.participating;
}
participants
} else {
warn!("start_rebalance:2 rebalance_meta is None exit");
info!("start_rebalance:2 rebalance_meta is None exit");
Vec::new()
}
};
for (idx, participating) in participants.iter().enumerate() {
if !*participating {
warn!("start_rebalance: pool {} is not participating, skipping", idx);
info!("start_rebalance: pool {} is not participating, skipping", idx);
continue;
}
if !get_global_endpoints().as_ref().get(idx).is_some_and(|v| {
warn!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints);
info!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints);
v.endpoints.as_ref().first().is_some_and(|e| {
warn!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local);
info!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local);
e.is_local
})
}) {
warn!("start_rebalance: pool {} is not local, skipping", idx);
info!("start_rebalance: pool {} is not local, skipping", idx);
continue;
}
@@ -562,7 +562,7 @@ impl ECStore {
});
}
warn!("start_rebalance: rebalance started done");
info!("start_rebalance: rebalance started done");
}
#[tracing::instrument(skip(self, rx))]
@@ -585,11 +585,11 @@ impl ECStore {
let state = match result {
Ok(_) => {
warn!("rebalance_buckets: completed");
info!("rebalance_buckets: completed");
msg = format!("Rebalance completed at {:?}", now);
RebalStatus::Completed},
Err(err) => {
warn!("rebalance_buckets: error: {:?}", err);
info!("rebalance_buckets: error: {:?}", err);
// TODO: check stop
if err.to_string().contains("canceled") {
msg = format!("Rebalance stopped at {:?}", now);
@@ -602,11 +602,11 @@ impl ECStore {
};
{
warn!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state);
info!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state);
let mut rebalance_meta = store.rebalance_meta.write().await;
if let Some(rbm) = rebalance_meta.as_mut() {
warn!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state);
info!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state);
rbm.pool_stats[pool_index].info.status = state;
rbm.pool_stats[pool_index].info.end_time = Some(now);
}
@@ -623,11 +623,11 @@ impl ECStore {
if let Err(err) = store.save_rebalance_stats(pool_index, RebalSaveOpt::Stats).await {
error!("{} err: {:?}", msg, err);
} else {
warn!(msg);
info!(msg);
}
if quit {
warn!("{}: exiting save_task", msg);
info!("{}: exiting save_task", msg);
return;
}
@@ -635,21 +635,21 @@ impl ECStore {
}
});
warn!("Pool {} rebalancing is started", pool_index);
info!("Pool {} rebalancing is started", pool_index);
loop {
if let Ok(true) = rx.try_recv() {
warn!("Pool {} rebalancing is stopped", pool_index);
info!("Pool {} rebalancing is stopped", pool_index);
done_tx.send(Err(Error::other("rebalance stopped canceled"))).await.ok();
break;
}
if let Some(bucket) = self.next_rebal_bucket(pool_index).await? {
warn!("Rebalancing bucket: start {}", bucket);
info!("Rebalancing bucket: start {}", bucket);
if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await {
if err.to_string().contains("not initialized") {
warn!("rebalance_bucket: rebalance not initialized, continue");
info!("rebalance_bucket: rebalance not initialized, continue");
continue;
}
error!("Error rebalancing bucket {}: {:?}", bucket, err);
@@ -657,19 +657,19 @@ impl ECStore {
break;
}
warn!("Rebalance bucket: done {} ", bucket);
info!("Rebalance bucket: done {} ", bucket);
self.bucket_rebalance_done(pool_index, bucket).await?;
} else {
warn!("Rebalance bucket: no bucket to rebalance");
info!("Rebalance bucket: no bucket to rebalance");
break;
}
}
warn!("Pool {} rebalancing is done", pool_index);
info!("Pool {} rebalancing is done", pool_index);
done_tx.send(Ok(())).await.ok();
save_task.await.ok();
warn!("Pool {} rebalancing is done2", pool_index);
info!("Pool {} rebalancing is done2", pool_index);
Ok(())
}
@@ -680,7 +680,7 @@ impl ECStore {
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) {
// Check if the pool's rebalance status is already completed
if pool_stat.info.status == RebalStatus::Completed {
warn!("check_if_rebalance_done: pool {} is already completed", pool_index);
info!("check_if_rebalance_done: pool {} is already completed", pool_index);
return true;
}
@@ -691,7 +691,7 @@ impl ECStore {
if (pfi - meta.percent_free_goal).abs() <= 0.05 {
pool_stat.info.status = RebalStatus::Completed;
pool_stat.info.end_time = Some(OffsetDateTime::now_utc());
warn!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi);
info!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi);
return true;
}
}
@@ -710,7 +710,7 @@ impl ECStore {
set: Arc<SetDisks>,
// wk: Arc<Workers>,
) {
warn!("rebalance_entry: start rebalance_entry");
info!("rebalance_entry: start rebalance_entry");
// defer!(|| async {
// warn!("rebalance_entry: defer give worker start");
@@ -719,12 +719,12 @@ impl ECStore {
// });
if entry.is_dir() {
warn!("rebalance_entry: entry is dir, skipping");
info!("rebalance_entry: entry is dir, skipping");
return;
}
if self.check_if_rebalance_done(pool_index).await {
warn!("rebalance_entry: rebalance done, skipping pool {}", pool_index);
info!("rebalance_entry: rebalance done, skipping pool {}", pool_index);
return;
}
@@ -732,7 +732,7 @@ impl ECStore {
Ok(fivs) => fivs,
Err(err) => {
error!("rebalance_entry Error getting file info versions: {}", err);
warn!("rebalance_entry: Error getting file info versions, skipping");
info!("rebalance_entry: Error getting file info versions, skipping");
return;
}
};
@@ -743,7 +743,7 @@ impl ECStore {
let expired: usize = 0;
for version in fivs.versions.iter() {
if version.is_remote() {
warn!("rebalance_entry Entry {} is remote, skipping", version.name);
info!("rebalance_entry Entry {} is remote, skipping", version.name);
continue;
}
// TODO: filterLifecycle
@@ -751,7 +751,7 @@ impl ECStore {
let remaining_versions = fivs.versions.len() - expired;
if version.deleted && remaining_versions == 1 {
rebalanced += 1;
warn!("rebalance_entry Entry {} is deleted and last version, skipping", version.name);
info!("rebalance_entry Entry {} is deleted and last version, skipping", version.name);
continue;
}
let version_id = version.version_id.map(|v| v.to_string());
@@ -802,7 +802,7 @@ impl ECStore {
}
for _i in 0..3 {
warn!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name);
info!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name);
let rd = match set
.get_object_reader(
bucket.as_str(),
@@ -821,7 +821,7 @@ impl ECStore {
Err(err) => {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) {
ignore = true;
warn!(
info!(
"rebalance_entry: get_object_reader, bucket: {}, version: {}, ignore",
&bucket, &version.name
);
@@ -837,7 +837,7 @@ impl ECStore {
if let Err(err) = self.clone().rebalance_object(pool_index, bucket.clone(), rd).await {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
ignore = true;
warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
break;
}
@@ -852,7 +852,7 @@ impl ECStore {
}
if ignore {
warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
continue;
}
@@ -884,7 +884,7 @@ impl ECStore {
{
error!("rebalance_entry: delete_object err {:?}", &err);
} else {
warn!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name);
info!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name);
}
}
}
@@ -1022,7 +1022,7 @@ impl ECStore {
#[tracing::instrument(skip(self, rx))]
async fn rebalance_bucket(self: &Arc<Self>, rx: B_Receiver<bool>, bucket: String, pool_index: usize) -> Result<()> {
// Placeholder for actual bucket rebalance logic
warn!("Rebalancing bucket {} in pool {}", bucket, pool_index);
info!("Rebalancing bucket {} in pool {}", bucket, pool_index);
// TODO: other config
// if bucket != RUSTFS_META_BUCKET{
@@ -1047,12 +1047,12 @@ impl ECStore {
// let wk = wk.clone();
let set = set.clone();
Box::pin(async move {
warn!("rebalance_entry: rebalance_entry spawn start");
info!("rebalance_entry: rebalance_entry spawn start");
// wk.take().await;
// tokio::spawn(async move {
warn!("rebalance_entry: rebalance_entry spawn start2");
info!("rebalance_entry: rebalance_entry spawn start2");
this.rebalance_entry(bucket, pool_index, entry, set).await;
warn!("rebalance_entry: rebalance_entry spawn done");
info!("rebalance_entry: rebalance_entry spawn done");
// });
})
}
@@ -1079,7 +1079,7 @@ impl ECStore {
for job in jobs {
job.await.unwrap();
}
warn!("rebalance_bucket: rebalance_bucket done");
info!("rebalance_bucket: rebalance_bucket done");
Ok(())
}
@@ -1089,7 +1089,7 @@ impl ECStore {
let mut meta = RebalanceMeta::new();
if let Err(err) = meta.load(self.pools[0].clone()).await {
if err != Error::ConfigNotFound {
warn!("save_rebalance_stats: load err: {:?}", err);
info!("save_rebalance_stats: load err: {:?}", err);
return Err(err);
}
}
@@ -1117,7 +1117,7 @@ impl ECStore {
*rebalance_meta = Some(meta.clone());
}
warn!(
info!(
"save_rebalance_stats: save rebalance meta, pool_idx: {}, opt: {:?}, meta: {:?}",
pool_idx, opt, meta
);
@@ -1135,15 +1135,15 @@ impl SetDisks {
bucket: String,
cb: ListCallback,
) -> Result<()> {
warn!("list_objects_to_rebalance: start list_objects_to_rebalance");
info!("list_objects_to_rebalance: start list_objects_to_rebalance");
// Placeholder for actual object listing logic
let (disks, _) = self.get_online_disks_with_healing(false).await;
if disks.is_empty() {
warn!("list_objects_to_rebalance: no disk available");
info!("list_objects_to_rebalance: no disk available");
return Err(Error::other("errNoDiskAvailable"));
}
warn!("list_objects_to_rebalance: get online disks with healing");
info!("list_objects_to_rebalance: get online disks with healing");
let listing_quorum = self.set_drive_count.div_ceil(2);
let resolver = MetadataResolutionParams {
@@ -1162,7 +1162,7 @@ impl SetDisks {
recursice: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
warn!("list_objects_to_rebalance: agreed: {:?}", &entry.name);
info!("list_objects_to_rebalance: agreed: {:?}", &entry.name);
Box::pin(cb1(entry))
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
@@ -1172,11 +1172,11 @@ impl SetDisks {
match entries.resolve(resolver) {
Some(entry) => {
warn!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name);
info!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name);
Box::pin(async move { cb(entry).await })
}
None => {
warn!("list_objects_to_rebalance: list_objects_to_decommission get none");
info!("list_objects_to_rebalance: list_objects_to_decommission get none");
Box::pin(async {})
}
}
@@ -1186,7 +1186,7 @@ impl SetDisks {
)
.await?;
warn!("list_objects_to_rebalance: list_objects_to_rebalance done");
info!("list_objects_to_rebalance: list_objects_to_rebalance done");
Ok(())
}
}

View File

@@ -81,7 +81,7 @@ use rustfs_utils::{
use s3s::header::X_AMZ_RESTORE;
use sha2::{Digest, Sha256};
use std::hash::Hash;
use std::mem;
use std::mem::{self};
use std::time::SystemTime;
use std::{
collections::{HashMap, HashSet},
@@ -107,12 +107,12 @@ use workers::workers::Workers;
pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SetDisks {
pub lockers: Vec<LockApi>,
pub locker_owner: String,
pub ns_mutex: Arc<RwLock<NsLockMap>>,
pub disks: RwLock<Vec<Option<DiskStore>>>,
pub disks: Arc<RwLock<Vec<Option<DiskStore>>>>,
pub set_endpoints: Vec<Endpoint>,
pub set_drive_count: usize,
pub default_parity_count: usize,
@@ -122,12 +122,54 @@ pub struct SetDisks {
}
impl SetDisks {
#[allow(clippy::too_many_arguments)]
pub async fn new(
lockers: Vec<LockApi>,
locker_owner: String,
ns_mutex: Arc<RwLock<NsLockMap>>,
disks: Arc<RwLock<Vec<Option<DiskStore>>>>,
set_drive_count: usize,
default_parity_count: usize,
set_index: usize,
pool_index: usize,
set_endpoints: Vec<Endpoint>,
format: FormatV3,
) -> Arc<Self> {
Arc::new(SetDisks {
lockers,
locker_owner,
ns_mutex,
disks,
set_drive_count,
default_parity_count,
set_index,
pool_index,
format,
set_endpoints,
})
}
async fn get_disks_internal(&self) -> Vec<Option<DiskStore>> {
let rl = self.disks.read().await;
rl.clone()
}
pub async fn get_local_disks(&self) -> Vec<Option<DiskStore>> {
let rl = self.disks.read().await;
let mut disks: Vec<Option<DiskStore>> = rl
.clone()
.into_iter()
.filter(|v| v.as_ref().is_some_and(|d| d.is_local()))
.collect();
let mut rng = rand::rng();
disks.shuffle(&mut rng);
disks
}
async fn get_online_disks(&self) -> Vec<Option<DiskStore>> {
let mut disks = self.get_disks_internal().await;

View File

@@ -35,8 +35,8 @@ use tokio::sync::RwLock;
use uuid::Uuid;
use crate::heal::heal_ops::HealSequence;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{error, info};
@@ -55,7 +55,15 @@ pub struct Sets {
pub set_drive_count: usize,
pub default_parity_count: usize,
pub distribution_algo: DistributionAlgoVersion,
ctx: CancellationToken,
exit_signal: Option<Sender<()>>,
}
impl Drop for Sets {
fn drop(&mut self) {
if let Some(exit_signal) = self.exit_signal.take() {
let _ = exit_signal.send(());
}
}
}
impl Sets {
@@ -144,22 +152,25 @@ impl Sets {
// warn!("sets new set_drive {:?}", &set_drive);
let set_disks = SetDisks {
lockers: locker.clone(),
locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(),
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))),
disks: RwLock::new(set_drive),
let set_disks = SetDisks::new(
locker.clone(),
GLOBAL_Local_Node_Name.read().await.to_string(),
Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))),
Arc::new(RwLock::new(set_drive)),
set_drive_count,
default_parity_count: partiy_count,
set_index: i,
pool_index: pool_idx,
partiy_count,
i,
pool_idx,
set_endpoints,
format: fm.clone(),
};
fm.clone(),
)
.await;
disk_set.push(Arc::new(set_disks));
disk_set.push(set_disks);
}
let (tx, rx) = tokio::sync::broadcast::channel(1);
let sets = Arc::new(Self {
id: fm.id,
// sets: todo!(),
@@ -173,12 +184,17 @@ impl Sets {
set_drive_count,
default_parity_count: partiy_count,
distribution_algo: fm.erasure.distribution_algo.clone(),
ctx: CancellationToken::new(),
exit_signal: Some(tx),
});
let asets = sets.clone();
tokio::spawn(async move { asets.monitor_and_connect_endpoints().await });
let rx1 = rx.resubscribe();
tokio::spawn(async move { asets.monitor_and_connect_endpoints(rx1).await });
// let sets2 = sets.clone();
// let rx2 = rx.resubscribe();
// tokio::spawn(async move { sets2.cleanup_deleted_objects_loop(rx2).await });
Ok(sets)
}
@@ -186,7 +202,38 @@ impl Sets {
pub fn set_drive_count(&self) -> usize {
self.set_drive_count
}
pub async fn monitor_and_connect_endpoints(&self) {
// pub async fn cleanup_deleted_objects_loop(self: Arc<Self>, mut rx: Receiver<()>) {
// tokio::time::sleep(Duration::from_secs(5)).await;
// info!("start cleanup_deleted_objects_loop");
// // TODO: config interval
// let mut interval = tokio::time::interval(Duration::from_secs(15 * 3));
// loop {
// tokio::select! {
// _= interval.tick()=>{
// info!("cleanup_deleted_objects_loop tick");
// for set in self.disk_set.iter() {
// set.clone().cleanup_deleted_objects().await;
// }
// interval.reset();
// },
// _ = rx.recv() => {
// warn!("cleanup_deleted_objects_loop ctx cancelled");
// break;
// }
// }
// }
// warn!("cleanup_deleted_objects_loop exit");
// }
pub async fn monitor_and_connect_endpoints(&self, mut rx: Receiver<()>) {
tokio::time::sleep(Duration::from_secs(5)).await;
info!("start monitor_and_connect_endpoints");
@@ -195,7 +242,6 @@ impl Sets {
// TODO: config interval
let mut interval = tokio::time::interval(Duration::from_secs(15 * 3));
let cloned_token = self.ctx.clone();
loop {
tokio::select! {
_= interval.tick()=>{
@@ -205,7 +251,7 @@ impl Sets {
interval.reset();
},
_ = cloned_token.cancelled() => {
_ = rx.recv() => {
warn!("monitor_and_connect_endpoints ctx cancelled");
break;
}

View File

@@ -134,13 +134,11 @@ impl GetObjectReader {
return Err(Error::other(format!("invalid decompressed size {}", actual_size)));
};
warn!("actual_size: {}", actual_size);
let dec_reader = LimitReader::new(dec_reader, actual_size);
let mut oi = oi.clone();
oi.size = dec_length;
warn!("oi.size: {}, off: {}, length: {}", oi.size, off, length);
return Ok((
GetObjectReader {
stream: Box::new(dec_reader),

View File

@@ -15,7 +15,7 @@ use crate::{
use futures::future::join_all;
use std::collections::{HashMap, hash_map::Entry};
use tracing::{debug, warn};
use tracing::{debug, info, warn};
use uuid::Uuid;
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<DiskError>>) {
@@ -52,10 +52,9 @@ pub async fn connect_load_init_formats(
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3> {
warn!("connect_load_init_formats first_disk: {}", first_disk);
let (formats, errs) = load_format_erasure_all(disks, false).await;
debug!("load_format_erasure_all errs {:?}", &errs);
// debug!("load_format_erasure_all errs {:?}", &errs);
check_disk_fatal_errs(&errs)?;
@@ -63,14 +62,14 @@ pub async fn connect_load_init_formats(
if first_disk && should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
warn!("first_disk && should_init_erasure_disks");
info!("first_disk && should_init_erasure_disks");
// new format and save
let fm = init_format_erasure(disks, set_count, set_drive_count, deployment_id).await?;
return Ok(fm);
}
warn!(
info!(
"first_disk: {}, should_init_erasure_disks: {}",
first_disk,
should_init_erasure_disks(&errs)

View File

@@ -458,5 +458,5 @@ async fn load_tier_config(api: Arc<ECStore>) -> std::result::Result<TierConfigMg
}
pub fn is_err_config_not_found(err: &StorageError) -> bool {
matches!(err, StorageError::ObjectNotFound(_, _))
matches!(err, StorageError::ObjectNotFound(_, _)) || err == &StorageError::ConfigNotFound
}