Merge pull request #370 from rustfs/pref

Pref
This commit is contained in:
guojidan
2025-04-29 18:56:24 +08:00
committed by GitHub
7 changed files with 58 additions and 35 deletions

1
Cargo.lock generated
View File

@@ -7269,6 +7269,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"shadow-rs",
"socket2",
"tikv-jemallocator",
"time",
"tokio",

View File

@@ -38,7 +38,9 @@ use crate::set_disk::{
CHECK_PART_VOLUME_NOT_FOUND,
};
use crate::store_api::{BitrotAlgorithm, StorageAPI};
use crate::utils::fs::{access, lstat, remove, remove_all, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY};
use crate::utils::fs::{
access, lstat, remove, remove_all, remove_all_std, remove_std, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY,
};
use crate::utils::os::get_info;
use crate::utils::path::{
self, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX,
@@ -315,9 +317,9 @@ impl LocalDisk {
#[allow(unused_variables)]
pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> {
if recursive {
remove_all(delete_path).await?;
remove_all_std(delete_path)?;
} else {
remove(delete_path).await?;
remove_std(delete_path)?;
}
return Ok(());
@@ -365,7 +367,7 @@ impl LocalDisk {
Ok(())
}
// #[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self))]
pub async fn delete_file(
&self,
base_path: &PathBuf,

View File

@@ -147,6 +147,7 @@ pub fn reduce_read_quorum_errs(
// 根据写quorum验证错误数量
// 返回最大错误数量的下标或QuorumError
#[tracing::instrument(level = "info", skip_all)]
pub fn reduce_write_quorum_errs(
errs: &[Option<Error>],
ignored_errs: &[Box<dyn CheckErrorFn>],

View File

@@ -407,7 +407,7 @@ impl SetDisks {
}
#[allow(dead_code)]
#[tracing::instrument(level = "debug", skip(self, disks))]
#[tracing::instrument(level = "info", skip(self, disks))]
async fn commit_rename_data_dir(
&self,
disks: &[Option<DiskStore>],
@@ -417,40 +417,30 @@ impl SetDisks {
write_quorum: usize,
) -> Result<()> {
let file_path = format!("{}/{}", object, data_dir);
let mut futures = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let futures = disks.iter().map(|disk| {
let file_path = file_path.clone();
futures.push(async move {
async move {
if let Some(disk) = disk {
disk.delete(
bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.await
match disk
.delete(
bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.await
{
Ok(_) => None,
Err(e) => Some(e),
}
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errs.push(None);
}
Err(e) => {
errs.push(Some(e));
Some(Error::new(DiskError::DiskNotFound))
}
}
}
});
let errs: Vec<Option<Error>> = join_all(futures).await;
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
return Err(err);

View File

@@ -132,6 +132,24 @@ pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
}
}
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
std::fs::remove_dir(path.as_ref())
} else {
std::fs::remove_file(path.as_ref())
}
}
pub fn remove_all_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
std::fs::remove_dir_all(path.as_ref())
} else {
std::fs::remove_file(path.as_ref())
}
}
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir(path.as_ref()).await
}

View File

@@ -62,6 +62,7 @@ serde.workspace = true
serde_json.workspace = true
serde_urlencoded = { workspace = true }
shadow-rs = { workspace = true, features = ["build", "metadata"] }
socket2 = "0.5.9"
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util.workspace = true

View File

@@ -53,6 +53,7 @@ use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard}
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use socket2::SockRef;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
@@ -65,6 +66,8 @@ use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, warn};
use tracing::{instrument, Span};
// const MI_B: usize = 1024 * 1024;
#[cfg(all(target_os = "linux", target_env = "gnu"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@@ -428,9 +431,16 @@ async fn run(opt: config::Opt) -> Result<()> {
}
};
if let Err(err) = socket.set_nodelay(true) {
let socket_ref = SockRef::from(&socket);
if let Err(err) = socket_ref.set_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
// if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) {
// warn!(?err, "Failed to set set_recv_buffer_size");
// }
// if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) {
// warn!(?err, "Failed to set set_send_buffer_size");
// }
if has_tls_certs {
debug!("TLS certificates found, starting with SIGINT");