mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
done read/write quorum, need test
This commit is contained in:
@@ -204,10 +204,10 @@ impl LocalDisk {
|
||||
fs::create_dir_all(dst_data_path.parent().unwrap_or(Path::new("/"))).await?;
|
||||
}
|
||||
|
||||
debug!(
|
||||
"rename_all from \n {:?} \n to \n {:?} \n skip:{:?}",
|
||||
&src_data_path, &dst_data_path, &skip
|
||||
);
|
||||
// debug!(
|
||||
// "rename_all from \n {:?} \n to \n {:?} \n skip:{:?}",
|
||||
// &src_data_path, &dst_data_path, &skip
|
||||
// );
|
||||
|
||||
fs::rename(&src_data_path, &dst_data_path).await?;
|
||||
|
||||
@@ -221,7 +221,7 @@ impl LocalDisk {
|
||||
fs::create_dir_all(parent).await?;
|
||||
}
|
||||
}
|
||||
debug!("move_to_trash from:{:?} to {:?}", &delete_path, &trash_path);
|
||||
// debug!("move_to_trash from:{:?} to {:?}", &delete_path, &trash_path);
|
||||
// TODO: 清空回收站
|
||||
if let Err(err) = fs::rename(&delete_path, &trash_path).await {
|
||||
match err.kind() {
|
||||
@@ -263,15 +263,15 @@ impl LocalDisk {
|
||||
recursive: bool,
|
||||
immediate_purge: bool,
|
||||
) -> Result<()> {
|
||||
debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path);
|
||||
// debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path);
|
||||
|
||||
if is_root_path(base_path) || is_root_path(delete_path) {
|
||||
debug!("delete_file skip {:?}", &delete_path);
|
||||
// debug!("delete_file skip {:?}", &delete_path);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !delete_path.starts_with(base_path) || base_path == delete_path {
|
||||
debug!("delete_file skip {:?}", &delete_path);
|
||||
// debug!("delete_file skip {:?}", &delete_path);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -279,9 +279,9 @@ impl LocalDisk {
|
||||
self.move_to_trash(delete_path, recursive, immediate_purge).await?;
|
||||
} else {
|
||||
if delete_path.is_dir() {
|
||||
debug!("delete_file remove_dir {:?}", &delete_path);
|
||||
// debug!("delete_file remove_dir {:?}", &delete_path);
|
||||
if let Err(err) = fs::remove_dir(&delete_path).await {
|
||||
debug!("remove_dir err {:?} when {:?}", &err, &delete_path);
|
||||
// debug!("remove_dir err {:?} when {:?}", &err, &delete_path);
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => (),
|
||||
// ErrorKind::DirectoryNotEmpty => (),
|
||||
@@ -293,10 +293,10 @@ impl LocalDisk {
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("delete_file remove_dir done {:?}", &delete_path);
|
||||
// debug!("delete_file remove_dir done {:?}", &delete_path);
|
||||
} else {
|
||||
if let Err(err) = fs::remove_file(&delete_path).await {
|
||||
debug!("remove_file err {:?} when {:?}", &err, &delete_path);
|
||||
// debug!("remove_file err {:?} when {:?}", &err, &delete_path);
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => (),
|
||||
_ => {
|
||||
@@ -312,7 +312,7 @@ impl LocalDisk {
|
||||
Box::pin(self.delete_file(base_path, &PathBuf::from(dir_path), false, false)).await?;
|
||||
}
|
||||
|
||||
debug!("delete_file done {:?}", &delete_path);
|
||||
// debug!("delete_file done {:?}", &delete_path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -811,7 +811,7 @@ impl DiskAPI for LocalDisk {
|
||||
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
|
||||
let p = self.get_object_path(volume, path)?;
|
||||
|
||||
debug!("read_file {:?}", &p);
|
||||
// debug!("read_file {:?}", &p);
|
||||
let file = File::options().read(true).open(&p).await?;
|
||||
|
||||
Ok(FileReader::Local(LocalFileReader::new(file)))
|
||||
|
||||
@@ -26,7 +26,7 @@ pub struct Erasure {
|
||||
|
||||
impl Erasure {
|
||||
pub fn new(data_shards: usize, parity_shards: usize, block_size: usize) -> Self {
|
||||
warn!(
|
||||
debug!(
|
||||
"Erasure new data_shards {},parity_shards {} block_size {} ",
|
||||
data_shards, parity_shards, block_size
|
||||
);
|
||||
@@ -57,7 +57,7 @@ impl Erasure {
|
||||
{
|
||||
let mut stream = ChunkedStream::new(body, total_size, self.block_size, false);
|
||||
let mut total: usize = 0;
|
||||
let mut idx = 0;
|
||||
// let mut idx = 0;
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(data) => {
|
||||
@@ -68,19 +68,19 @@ impl Erasure {
|
||||
break;
|
||||
}
|
||||
|
||||
idx += 1;
|
||||
debug!("encode {} get data {}", idx, data.len());
|
||||
// idx += 1;
|
||||
// debug!("encode {} get data {}", idx, data.len());
|
||||
|
||||
let blocks = self.encode_data(data.as_ref())?;
|
||||
|
||||
debug!(
|
||||
"encode shard {} size: {}/{} from block_size {}, total_size {} ",
|
||||
idx,
|
||||
blocks[0].len(),
|
||||
blocks.len(),
|
||||
data.len(),
|
||||
total_size
|
||||
);
|
||||
// debug!(
|
||||
// "encode shard {} size: {}/{} from block_size {}, total_size {} ",
|
||||
// idx,
|
||||
// blocks[0].len(),
|
||||
// blocks.len(),
|
||||
// data.len(),
|
||||
// total_size
|
||||
// );
|
||||
|
||||
let mut errs = Vec::new();
|
||||
|
||||
@@ -94,14 +94,13 @@ impl Erasure {
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Erasure encode errs {:?}", &errs);
|
||||
|
||||
let none_count = errs.iter().filter(|&x| x.is_none()).count();
|
||||
if none_count >= write_quorum {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
|
||||
warn!("Erasure encode errs {:?}", &errs);
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
@@ -161,7 +160,7 @@ impl Erasure {
|
||||
break;
|
||||
}
|
||||
|
||||
debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length);
|
||||
// debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length);
|
||||
|
||||
let mut bufs = reader.read().await?;
|
||||
|
||||
@@ -175,7 +174,7 @@ impl Erasure {
|
||||
|
||||
bytes_writed += writed_n;
|
||||
|
||||
debug!("decode {} writed_n {}, total_writed: {} ", block_idx, writed_n, bytes_writed);
|
||||
// debug!("decode {} writed_n {}, total_writed: {} ", block_idx, writed_n, bytes_writed);
|
||||
}
|
||||
|
||||
if bytes_writed != length {
|
||||
|
||||
@@ -23,7 +23,7 @@ use tokio::sync::RwLock;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -135,14 +135,17 @@ impl Sets {
|
||||
pub async fn monitor_and_connect_endpoints(&self) {
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
info!("start monitor_and_connect_endpoints");
|
||||
|
||||
self.connect_disks().await;
|
||||
|
||||
// TODO: config interval
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(15 * 3));
|
||||
let cloned_token = self.ctx.clone();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_= interval.tick()=>{
|
||||
debug!("tick...");
|
||||
// debug!("tick...");
|
||||
self.connect_disks().await;
|
||||
|
||||
interval.reset();
|
||||
@@ -159,11 +162,11 @@ impl Sets {
|
||||
}
|
||||
|
||||
async fn connect_disks(&self) {
|
||||
debug!("start connect_disks ...");
|
||||
// debug!("start connect_disks ...");
|
||||
for set in self.disk_set.iter() {
|
||||
set.connect_disks().await;
|
||||
}
|
||||
debug!("done connect_disks ...");
|
||||
// debug!("done connect_disks ...");
|
||||
}
|
||||
|
||||
pub fn get_disks(&self, set_idx: usize) -> Arc<SetDisks> {
|
||||
|
||||
@@ -173,7 +173,7 @@ impl ECStore {
|
||||
|
||||
let mut local_disks = Vec::new();
|
||||
|
||||
info!("endpoint_pools: {:?}", endpoint_pools);
|
||||
debug!("endpoint_pools: {:?}", endpoint_pools);
|
||||
|
||||
for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() {
|
||||
// TODO: read from config parseStorageClass
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::{
|
||||
fmt::Debug,
|
||||
};
|
||||
|
||||
use tracing::warn;
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
|
||||
@@ -50,13 +50,11 @@ pub async fn connect_load_init_formats(
|
||||
set_drive_count: usize,
|
||||
deployment_id: Option<Uuid>,
|
||||
) -> Result<FormatV3, Error> {
|
||||
warn!("connect_load_init_formats id: {:?}, first_disk: {}", deployment_id, first_disk);
|
||||
|
||||
let (formats, errs) = load_format_erasure_all(disks, false).await;
|
||||
|
||||
DiskError::check_disk_fatal_errs(&errs)?;
|
||||
debug!("load_format_erasure_all errs {:?}", &errs);
|
||||
|
||||
warn!("load_format_erasure_all errs {:?}", &errs);
|
||||
DiskError::check_disk_fatal_errs(&errs)?;
|
||||
|
||||
check_format_erasure_values(&formats, set_drive_count)?;
|
||||
|
||||
@@ -67,7 +65,7 @@ pub async fn connect_load_init_formats(
|
||||
|
||||
let errs = save_format_file_all(disks, &fms).await;
|
||||
|
||||
warn!("save_format_file_all errs {:?}", &errs);
|
||||
debug!("save_format_file_all errs {:?}", &errs);
|
||||
// TODO: check quorum
|
||||
// reduceWriteQuorumErrs(&errs)?;
|
||||
|
||||
@@ -132,15 +130,8 @@ fn get_format_erasure_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3
|
||||
|
||||
let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0));
|
||||
|
||||
warn!("get_format_erasure_in_quorum fi: {:?}", &formats);
|
||||
|
||||
if *max_drives == 0 || *max_count <= formats.len() / 2 {
|
||||
warn!(
|
||||
"*max_drives == 0 || *max_count < formats.len() / 2, {} || {}<{}",
|
||||
max_drives,
|
||||
max_count,
|
||||
formats.len() / 2
|
||||
);
|
||||
warn!("get_format_erasure_in_quorum fi: {:?}", &formats);
|
||||
return Err(Error::new(ErasureError::ErasureReadQuorum));
|
||||
}
|
||||
|
||||
|
||||
@@ -164,10 +164,9 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
}
|
||||
});
|
||||
|
||||
warn!(" init store");
|
||||
// init store
|
||||
ECStore::new(opt.address.clone(), endpoint_pools.clone()).await?;
|
||||
warn!(" init store success!");
|
||||
info!(" init store success!");
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
|
||||
@@ -7,7 +7,7 @@ mkdir -p ./target/volume/test{0..4}
|
||||
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug"
|
||||
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug"
|
||||
fi
|
||||
|
||||
DATA_DIR_ARG="./target/volume/test{0...4}"
|
||||
|
||||
Reference in New Issue
Block a user