From f46c53b77e1fbcc87b1f697033d0d484c05db0e8 Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 25 Sep 2024 16:21:21 +0800 Subject: [PATCH] done read/write quorum, need test --- ecstore/src/disk/local.rs | 28 ++++++++++++++-------------- ecstore/src/erasure.rs | 31 +++++++++++++++---------------- ecstore/src/sets.rs | 11 +++++++---- ecstore/src/store.rs | 2 +- ecstore/src/store_init.rs | 19 +++++-------------- rustfs/src/main.rs | 3 +-- scripts/run.sh | 2 +- 7 files changed, 44 insertions(+), 52 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index c1a9900d..a798f7d1 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -204,10 +204,10 @@ impl LocalDisk { fs::create_dir_all(dst_data_path.parent().unwrap_or(Path::new("/"))).await?; } - debug!( - "rename_all from \n {:?} \n to \n {:?} \n skip:{:?}", - &src_data_path, &dst_data_path, &skip - ); + // debug!( + // "rename_all from \n {:?} \n to \n {:?} \n skip:{:?}", + // &src_data_path, &dst_data_path, &skip + // ); fs::rename(&src_data_path, &dst_data_path).await?; @@ -221,7 +221,7 @@ impl LocalDisk { fs::create_dir_all(parent).await?; } } - debug!("move_to_trash from:{:?} to {:?}", &delete_path, &trash_path); + // debug!("move_to_trash from:{:?} to {:?}", &delete_path, &trash_path); // TODO: 清空回收站 if let Err(err) = fs::rename(&delete_path, &trash_path).await { match err.kind() { @@ -263,15 +263,15 @@ impl LocalDisk { recursive: bool, immediate_purge: bool, ) -> Result<()> { - debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path); + // debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path); if is_root_path(base_path) || is_root_path(delete_path) { - debug!("delete_file skip {:?}", &delete_path); + // debug!("delete_file skip {:?}", &delete_path); return Ok(()); } if !delete_path.starts_with(base_path) || base_path == delete_path { - debug!("delete_file skip {:?}", &delete_path); + // debug!("delete_file skip {:?}", &delete_path); return Ok(()); } @@ -279,9 +279,9 @@ impl LocalDisk { self.move_to_trash(delete_path, recursive, immediate_purge).await?; } else { if delete_path.is_dir() { - debug!("delete_file remove_dir {:?}", &delete_path); + // debug!("delete_file remove_dir {:?}", &delete_path); if let Err(err) = fs::remove_dir(&delete_path).await { - debug!("remove_dir err {:?} when {:?}", &err, &delete_path); + // debug!("remove_dir err {:?} when {:?}", &err, &delete_path); match err.kind() { ErrorKind::NotFound => (), // ErrorKind::DirectoryNotEmpty => (), @@ -293,10 +293,10 @@ impl LocalDisk { } } } - debug!("delete_file remove_dir done {:?}", &delete_path); + // debug!("delete_file remove_dir done {:?}", &delete_path); } else { if let Err(err) = fs::remove_file(&delete_path).await { - debug!("remove_file err {:?} when {:?}", &err, &delete_path); + // debug!("remove_file err {:?} when {:?}", &err, &delete_path); match err.kind() { ErrorKind::NotFound => (), _ => { @@ -312,7 +312,7 @@ impl LocalDisk { Box::pin(self.delete_file(base_path, &PathBuf::from(dir_path), false, false)).await?; } - debug!("delete_file done {:?}", &delete_path); + // debug!("delete_file done {:?}", &delete_path); Ok(()) } @@ -811,7 +811,7 @@ impl DiskAPI for LocalDisk { async fn read_file(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(volume, path)?; - debug!("read_file {:?}", &p); + // debug!("read_file {:?}", &p); let file = File::options().read(true).open(&p).await?; Ok(FileReader::Local(LocalFileReader::new(file))) diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 76da879c..c6ff3c19 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -26,7 +26,7 @@ pub struct Erasure { impl Erasure { pub fn new(data_shards: usize, parity_shards: usize, block_size: usize) -> Self { - warn!( + debug!( "Erasure new data_shards {},parity_shards {} block_size {} ", data_shards, parity_shards, block_size ); @@ -57,7 +57,7 @@ impl Erasure { { let mut stream = ChunkedStream::new(body, total_size, self.block_size, false); let mut total: usize = 0; - let mut idx = 0; + // let mut idx = 0; while let Some(result) = stream.next().await { match result { Ok(data) => { @@ -68,19 +68,19 @@ impl Erasure { break; } - idx += 1; - debug!("encode {} get data {}", idx, data.len()); + // idx += 1; + // debug!("encode {} get data {}", idx, data.len()); let blocks = self.encode_data(data.as_ref())?; - debug!( - "encode shard {} size: {}/{} from block_size {}, total_size {} ", - idx, - blocks[0].len(), - blocks.len(), - data.len(), - total_size - ); + // debug!( + // "encode shard {} size: {}/{} from block_size {}, total_size {} ", + // idx, + // blocks[0].len(), + // blocks.len(), + // data.len(), + // total_size + // ); let mut errs = Vec::new(); @@ -94,14 +94,13 @@ impl Erasure { } } - debug!("Erasure encode errs {:?}", &errs); - let none_count = errs.iter().filter(|&x| x.is_none()).count(); if none_count >= write_quorum { continue; } if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { + warn!("Erasure encode errs {:?}", &errs); return Err(err); } } @@ -161,7 +160,7 @@ impl Erasure { break; } - debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length); + // debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length); let mut bufs = reader.read().await?; @@ -175,7 +174,7 @@ impl Erasure { bytes_writed += writed_n; - debug!("decode {} writed_n {}, total_writed: {} ", block_idx, writed_n, bytes_writed); + // debug!("decode {} writed_n {}, total_writed: {} ", block_idx, writed_n, bytes_writed); } if bytes_writed != length { diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 95526ab7..5ae8f62a 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -23,7 +23,7 @@ use tokio::sync::RwLock; use tokio::sync::Semaphore; use tokio::time::Duration; use tokio_util::sync::CancellationToken; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use uuid::Uuid; #[derive(Debug, Clone)] @@ -135,14 +135,17 @@ impl Sets { pub async fn monitor_and_connect_endpoints(&self) { tokio::time::sleep(Duration::from_secs(5)).await; + info!("start monitor_and_connect_endpoints"); + self.connect_disks().await; + // TODO: config interval let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(15 * 3)); let cloned_token = self.ctx.clone(); loop { tokio::select! { _= interval.tick()=>{ - debug!("tick..."); + // debug!("tick..."); self.connect_disks().await; interval.reset(); @@ -159,11 +162,11 @@ impl Sets { } async fn connect_disks(&self) { - debug!("start connect_disks ..."); + // debug!("start connect_disks ..."); for set in self.disk_set.iter() { set.connect_disks().await; } - debug!("done connect_disks ..."); + // debug!("done connect_disks ..."); } pub fn get_disks(&self, set_idx: usize) -> Arc { diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 1454832d..a92177a6 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -173,7 +173,7 @@ impl ECStore { let mut local_disks = Vec::new(); - info!("endpoint_pools: {:?}", endpoint_pools); + debug!("endpoint_pools: {:?}", endpoint_pools); for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() { // TODO: read from config parseStorageClass diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index 35292b2e..96503746 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -13,7 +13,7 @@ use std::{ fmt::Debug, }; -use tracing::warn; +use tracing::{debug, info, warn}; use uuid::Uuid; pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { @@ -50,13 +50,11 @@ pub async fn connect_load_init_formats( set_drive_count: usize, deployment_id: Option, ) -> Result { - warn!("connect_load_init_formats id: {:?}, first_disk: {}", deployment_id, first_disk); - let (formats, errs) = load_format_erasure_all(disks, false).await; - DiskError::check_disk_fatal_errs(&errs)?; + debug!("load_format_erasure_all errs {:?}", &errs); - warn!("load_format_erasure_all errs {:?}", &errs); + DiskError::check_disk_fatal_errs(&errs)?; check_format_erasure_values(&formats, set_drive_count)?; @@ -67,7 +65,7 @@ pub async fn connect_load_init_formats( let errs = save_format_file_all(disks, &fms).await; - warn!("save_format_file_all errs {:?}", &errs); + debug!("save_format_file_all errs {:?}", &errs); // TODO: check quorum // reduceWriteQuorumErrs(&errs)?; @@ -132,15 +130,8 @@ fn get_format_erasure_in_quorum(formats: &[Option]) -> Result Result<()> { } }); - warn!(" init store"); // init store ECStore::new(opt.address.clone(), endpoint_pools.clone()).await?; - warn!(" init store success!"); + info!(" init store success!"); tokio::select! { _ = tokio::signal::ctrl_c() => { diff --git a/scripts/run.sh b/scripts/run.sh index 248d286b..1a7b2073 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -7,7 +7,7 @@ mkdir -p ./target/volume/test{0..4} if [ -z "$RUST_LOG" ]; then - export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug" + export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug" fi DATA_DIR_ARG="./target/volume/test{0...4}"