From 7e1135df8f75e83c0b3070981b7621126ba97ebd Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Tue, 29 Apr 2025 10:53:03 +0000 Subject: [PATCH 1/2] tmp3 Signed-off-by: junxiang Mu <1948535941@qq.com> --- Cargo.lock | 1 + ecstore/src/disk/local.rs | 10 ++++---- ecstore/src/quorum.rs | 1 + ecstore/src/set_disk.rs | 50 ++++++++++++++++----------------------- ecstore/src/utils/fs.rs | 18 ++++++++++++++ rustfs/Cargo.toml | 1 + rustfs/src/main.rs | 12 +++++++++- 7 files changed, 58 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81d6b4c3..1c62f665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7269,6 +7269,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadow-rs", + "socket2", "tikv-jemallocator", "time", "tokio", diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 45abdbf5..45bea2d9 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -38,7 +38,9 @@ use crate::set_disk::{ CHECK_PART_VOLUME_NOT_FOUND, }; use crate::store_api::{BitrotAlgorithm, StorageAPI}; -use crate::utils::fs::{access, lstat, remove, remove_all, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; +use crate::utils::fs::{ + access, lstat, remove, remove_all, remove_all_std, remove_std, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY, +}; use crate::utils::os::get_info; use crate::utils::path::{ self, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX, @@ -315,9 +317,9 @@ impl LocalDisk { #[allow(unused_variables)] pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> { if recursive { - remove_all(delete_path).await?; + remove_all_std(delete_path)?; } else { - remove(delete_path).await?; + remove_std(delete_path)?; } return Ok(()); @@ -365,7 +367,7 @@ impl LocalDisk { Ok(()) } - // #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self))] pub async fn delete_file( &self, base_path: &PathBuf, diff --git a/ecstore/src/quorum.rs b/ecstore/src/quorum.rs index d89fa523..d38177ca 100644 --- a/ecstore/src/quorum.rs +++ b/ecstore/src/quorum.rs @@ -147,6 +147,7 @@ pub fn reduce_read_quorum_errs( // 根据写quorum验证错误数量 // 返回最大错误数量的下标,或QuorumError +#[tracing::instrument(level = "info", skip_all)] pub fn reduce_write_quorum_errs( errs: &[Option], ignored_errs: &[Box], diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 12491b10..abf7422a 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -407,7 +407,7 @@ impl SetDisks { } #[allow(dead_code)] - #[tracing::instrument(level = "debug", skip(self, disks))] + #[tracing::instrument(level = "info", skip(self, disks))] async fn commit_rename_data_dir( &self, disks: &[Option], @@ -417,40 +417,30 @@ impl SetDisks { write_quorum: usize, ) -> Result<()> { let file_path = format!("{}/{}", object, data_dir); - - let mut futures = Vec::with_capacity(disks.len()); - let mut errs = Vec::with_capacity(disks.len()); - - for disk in disks.iter() { + let futures = disks.iter().map(|disk| { let file_path = file_path.clone(); - futures.push(async move { + async move { if let Some(disk) = disk { - disk.delete( - bucket, - &file_path, - DeleteOptions { - recursive: true, - ..Default::default() - }, - ) - .await + match disk + .delete( + bucket, + &file_path, + DeleteOptions { + recursive: true, + ..Default::default() + }, + ) + .await + { + Ok(_) => None, + Err(e) => Some(e), + } } else { - Err(Error::new(DiskError::DiskNotFound)) - } - }); - } - - let results = join_all(futures).await; - for result in results { - match result { - Ok(_) => { - errs.push(None); - } - Err(e) => { - errs.push(Some(e)); + Some(Error::new(DiskError::DiskNotFound)) } } - } + }); + let errs: Vec> = join_all(futures).await; if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { return Err(err); diff --git a/ecstore/src/utils/fs.rs b/ecstore/src/utils/fs.rs index 463989c0..9c0c591f 100644 --- a/ecstore/src/utils/fs.rs +++ b/ecstore/src/utils/fs.rs @@ -132,6 +132,24 @@ pub async fn remove_all(path: impl AsRef) -> io::Result<()> { } } +pub fn remove_std(path: impl AsRef) -> io::Result<()> { + let meta = std::fs::metadata(path.as_ref())?; + if meta.is_dir() { + std::fs::remove_dir(path.as_ref()) + } else { + std::fs::remove_file(path.as_ref()) + } +} + +pub fn remove_all_std(path: impl AsRef) -> io::Result<()> { + let meta = std::fs::metadata(path.as_ref())?; + if meta.is_dir() { + std::fs::remove_dir_all(path.as_ref()) + } else { + std::fs::remove_file(path.as_ref()) + } +} + pub async fn mkdir(path: impl AsRef) -> io::Result<()> { fs::create_dir(path.as_ref()).await } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 3019c060..460e55be 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -62,6 +62,7 @@ serde.workspace = true serde_json.workspace = true serde_urlencoded = { workspace = true } shadow-rs = { workspace = true, features = ["build", "metadata"] } +socket2 = "0.5.9" tracing.workspace = true time = { workspace = true, features = ["parsing", "formatting", "serde"] } tokio-util.workspace = true diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 7871fba5..c5eb684a 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -53,6 +53,7 @@ use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard} use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; +use socket2::SockRef; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -65,6 +66,8 @@ use tower_http::trace::TraceLayer; use tracing::Span; use tracing::{debug, error, info, info_span, warn}; +const MI_B: usize = 1024 * 1024; + #[cfg(all(target_os = "linux", target_env = "gnu"))] #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; @@ -425,9 +428,16 @@ async fn run(opt: config::Opt) -> Result<()> { } }; - if let Err(err) = socket.set_nodelay(true) { + let socket_ref = SockRef::from(&socket); + if let Err(err) = socket_ref.set_nodelay(true) { warn!(?err, "Failed to set TCP_NODELAY"); } + if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) { + warn!(?err, "Failed to set set_recv_buffer_size"); + } + if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) { + warn!(?err, "Failed to set set_send_buffer_size"); + } if has_tls_certs { debug!("TLS certificates found, starting with SIGINT"); From 8f917e4a196592e6789c4f14a8ee09a87bdd8aa3 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Tue, 29 Apr 2025 10:55:02 +0000 Subject: [PATCH 2/2] tmp4 Signed-off-by: junxiang Mu <1948535941@qq.com> --- rustfs/src/main.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index c5eb684a..8355a324 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -66,7 +66,7 @@ use tower_http::trace::TraceLayer; use tracing::Span; use tracing::{debug, error, info, info_span, warn}; -const MI_B: usize = 1024 * 1024; +// const MI_B: usize = 1024 * 1024; #[cfg(all(target_os = "linux", target_env = "gnu"))] #[global_allocator] @@ -432,12 +432,12 @@ async fn run(opt: config::Opt) -> Result<()> { if let Err(err) = socket_ref.set_nodelay(true) { warn!(?err, "Failed to set TCP_NODELAY"); } - if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) { - warn!(?err, "Failed to set set_recv_buffer_size"); - } - if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) { - warn!(?err, "Failed to set set_send_buffer_size"); - } + // if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) { + // warn!(?err, "Failed to set set_recv_buffer_size"); + // } + // if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) { + // warn!(?err, "Failed to set set_send_buffer_size"); + // } if has_tls_certs { debug!("TLS certificates found, starting with SIGINT");