mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix cargo test error, delete unnecessary files
This commit is contained in:
@@ -1,331 +0,0 @@
|
||||
use pin_project_lite::pin_project;
|
||||
use rustfs_utils::{HashAlgorithm, read_full, write_all};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
|
||||
|
||||
pin_project! {
|
||||
/// BitrotReader reads (hash+data) blocks from an async reader and verifies hash integrity.
|
||||
pub struct BitrotReader<R> {
|
||||
#[pin]
|
||||
inner: R,
|
||||
hash_algo: HashAlgorithm,
|
||||
shard_size: usize,
|
||||
buf: Vec<u8>,
|
||||
hash_buf: Vec<u8>,
|
||||
hash_read: usize,
|
||||
data_buf: Vec<u8>,
|
||||
data_read: usize,
|
||||
hash_checked: bool,
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> BitrotReader<R>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + Sync,
|
||||
{
|
||||
/// Create a new BitrotReader.
|
||||
pub fn new(inner: R, shard_size: usize, algo: HashAlgorithm) -> Self {
|
||||
let hash_size = algo.size();
|
||||
Self {
|
||||
inner,
|
||||
hash_algo: algo,
|
||||
shard_size,
|
||||
buf: Vec::new(),
|
||||
hash_buf: vec![0u8; hash_size],
|
||||
hash_read: 0,
|
||||
data_buf: Vec::new(),
|
||||
data_read: 0,
|
||||
hash_checked: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Read a single (hash+data) block, verify hash, and return the number of bytes read into `out`.
|
||||
/// Returns an error if hash verification fails or data exceeds shard_size.
|
||||
pub async fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
|
||||
if out.len() > self.shard_size {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!("data size {} exceeds shard size {}", out.len(), self.shard_size),
|
||||
));
|
||||
}
|
||||
|
||||
let hash_size = self.hash_algo.size();
|
||||
// Read hash
|
||||
let mut hash_buf = vec![0u8; hash_size];
|
||||
if hash_size > 0 {
|
||||
self.inner.read_exact(&mut hash_buf).await?;
|
||||
}
|
||||
|
||||
let data_len = read_full(&mut self.inner, out).await?;
|
||||
|
||||
// // Read data
|
||||
// let mut data_len = 0;
|
||||
// while data_len < out.len() {
|
||||
// let n = self.inner.read(&mut out[data_len..]).await?;
|
||||
// if n == 0 {
|
||||
// break;
|
||||
// }
|
||||
// data_len += n;
|
||||
// // Only read up to one shard_size block
|
||||
// if data_len >= self.shard_size {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
if hash_size > 0 {
|
||||
let actual_hash = self.hash_algo.hash_encode(&out[..data_len]);
|
||||
if actual_hash != hash_buf {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitrot hash mismatch"));
|
||||
}
|
||||
}
|
||||
Ok(data_len)
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// BitrotWriter writes (hash+data) blocks to an async writer.
|
||||
pub struct BitrotWriter<W> {
|
||||
#[pin]
|
||||
inner: W,
|
||||
hash_algo: HashAlgorithm,
|
||||
shard_size: usize,
|
||||
buf: Vec<u8>,
|
||||
finished: bool,
|
||||
}
|
||||
}
|
||||
|
||||
impl<W> BitrotWriter<W>
|
||||
where
|
||||
W: AsyncWrite + Unpin + Send + Sync,
|
||||
{
|
||||
/// Create a new BitrotWriter.
|
||||
pub fn new(inner: W, shard_size: usize, algo: HashAlgorithm) -> Self {
|
||||
let hash_algo = algo;
|
||||
Self {
|
||||
inner,
|
||||
hash_algo,
|
||||
shard_size,
|
||||
buf: Vec::new(),
|
||||
finished: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> W {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Write a (hash+data) block. Returns the number of data bytes written.
|
||||
/// Returns an error if called after a short write or if data exceeds shard_size.
|
||||
pub async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
if buf.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if self.finished {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "bitrot writer already finished"));
|
||||
}
|
||||
|
||||
if buf.len() > self.shard_size {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!("data size {} exceeds shard size {}", buf.len(), self.shard_size),
|
||||
));
|
||||
}
|
||||
|
||||
if buf.len() < self.shard_size {
|
||||
self.finished = true;
|
||||
}
|
||||
|
||||
let hash_algo = &self.hash_algo;
|
||||
|
||||
if hash_algo.size() > 0 {
|
||||
let hash = hash_algo.hash_encode(buf);
|
||||
self.buf.extend_from_slice(&hash);
|
||||
}
|
||||
|
||||
self.buf.extend_from_slice(buf);
|
||||
|
||||
// Write hash+data in one call
|
||||
let mut n = write_all(&mut self.inner, &self.buf).await?;
|
||||
|
||||
if n < hash_algo.size() {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"short write: not enough bytes written",
|
||||
));
|
||||
}
|
||||
|
||||
n -= hash_algo.size();
|
||||
|
||||
self.buf.clear();
|
||||
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: HashAlgorithm) -> usize {
|
||||
if algo != HashAlgorithm::HighwayHash256S {
|
||||
return size;
|
||||
}
|
||||
size.div_ceil(shard_size) * algo.size() + size
|
||||
}
|
||||
|
||||
pub async fn bitrot_verify<R: AsyncRead + Unpin + Send>(
|
||||
mut r: R,
|
||||
want_size: usize,
|
||||
part_size: usize,
|
||||
algo: HashAlgorithm,
|
||||
_want: Vec<u8>,
|
||||
mut shard_size: usize,
|
||||
) -> std::io::Result<()> {
|
||||
let mut hash_buf = vec![0; algo.size()];
|
||||
let mut left = want_size;
|
||||
|
||||
if left != bitrot_shard_file_size(part_size, shard_size, algo.clone()) {
|
||||
return Err(std::io::Error::other("bitrot shard file size mismatch"));
|
||||
}
|
||||
|
||||
while left > 0 {
|
||||
let n = r.read_exact(&mut hash_buf).await?;
|
||||
left -= n;
|
||||
|
||||
if left < shard_size {
|
||||
shard_size = left;
|
||||
}
|
||||
|
||||
let mut buf = vec![0; shard_size];
|
||||
let read = r.read_exact(&mut buf).await?;
|
||||
|
||||
let actual_hash = algo.hash_encode(&buf);
|
||||
if actual_hash != hash_buf[0..n] {
|
||||
return Err(std::io::Error::other("bitrot hash mismatch"));
|
||||
}
|
||||
|
||||
left -= read;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use crate::{BitrotReader, BitrotWriter};
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use std::io::Cursor;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bitrot_read_write_ok() {
|
||||
let data = b"hello world! this is a test shard.";
|
||||
let data_size = data.len();
|
||||
let shard_size = 8;
|
||||
|
||||
let buf: Vec<u8> = Vec::new();
|
||||
let writer = Cursor::new(buf);
|
||||
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::HighwayHash256);
|
||||
|
||||
let mut n = 0;
|
||||
for chunk in data.chunks(shard_size) {
|
||||
n += bitrot_writer.write(chunk).await.unwrap();
|
||||
}
|
||||
assert_eq!(n, data.len());
|
||||
|
||||
// 读
|
||||
let reader = bitrot_writer.into_inner();
|
||||
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
|
||||
let mut out = Vec::new();
|
||||
let mut n = 0;
|
||||
while n < data_size {
|
||||
let mut buf = vec![0u8; shard_size];
|
||||
let m = bitrot_reader.read(&mut buf).await.unwrap();
|
||||
if m == 0 {
|
||||
break;
|
||||
}
|
||||
assert_eq!(&buf[..m], &data[n..n + m]);
|
||||
|
||||
out.extend_from_slice(&buf[..m]);
|
||||
n += m;
|
||||
}
|
||||
|
||||
assert_eq!(n, data_size);
|
||||
assert_eq!(data, &out[..]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bitrot_read_hash_mismatch() {
|
||||
let data = b"test data for bitrot";
|
||||
let data_size = data.len();
|
||||
let shard_size = 8;
|
||||
let buf: Vec<u8> = Vec::new();
|
||||
let writer = Cursor::new(buf);
|
||||
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::HighwayHash256);
|
||||
for chunk in data.chunks(shard_size) {
|
||||
let _ = bitrot_writer.write(chunk).await.unwrap();
|
||||
}
|
||||
let mut written = bitrot_writer.into_inner().into_inner();
|
||||
// change the last byte to make hash mismatch
|
||||
let pos = written.len() - 1;
|
||||
written[pos] ^= 0xFF;
|
||||
let reader = Cursor::new(written);
|
||||
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
|
||||
|
||||
let count = data_size.div_ceil(shard_size);
|
||||
|
||||
let mut idx = 0;
|
||||
let mut n = 0;
|
||||
while n < data_size {
|
||||
let mut buf = vec![0u8; shard_size];
|
||||
let res = bitrot_reader.read(&mut buf).await;
|
||||
|
||||
if idx == count - 1 {
|
||||
// 最后一个块,应该返回错误
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
|
||||
break;
|
||||
}
|
||||
|
||||
let m = res.unwrap();
|
||||
if m == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
assert_eq!(&buf[..m], &data[n..n + m]);
|
||||
|
||||
n += m;
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bitrot_read_write_none_hash() {
|
||||
let data = b"bitrot none hash test data!";
|
||||
let data_size = data.len();
|
||||
let shard_size = 8;
|
||||
|
||||
let buf: Vec<u8> = Vec::new();
|
||||
let writer = Cursor::new(buf);
|
||||
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::None);
|
||||
|
||||
let mut n = 0;
|
||||
for chunk in data.chunks(shard_size) {
|
||||
n += bitrot_writer.write(chunk).await.unwrap();
|
||||
}
|
||||
assert_eq!(n, data.len());
|
||||
|
||||
let reader = bitrot_writer.into_inner();
|
||||
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::None);
|
||||
let mut out = Vec::new();
|
||||
let mut n = 0;
|
||||
while n < data_size {
|
||||
let mut buf = vec![0u8; shard_size];
|
||||
let m = bitrot_reader.read(&mut buf).await.unwrap();
|
||||
if m == 0 {
|
||||
break;
|
||||
}
|
||||
assert_eq!(&buf[..m], &data[n..n + m]);
|
||||
out.extend_from_slice(&buf[..m]);
|
||||
n += m;
|
||||
}
|
||||
assert_eq!(n, data_size);
|
||||
assert_eq!(data, &out[..]);
|
||||
}
|
||||
}
|
||||
@@ -162,6 +162,7 @@ mod tests {
|
||||
|
||||
assert!(wrapper.is_err());
|
||||
let error = wrapper.unwrap_err();
|
||||
assert!(error.to_string().contains("io error"));
|
||||
println!("error: {:?}", error);
|
||||
assert_eq!(error, DiskError::DiskNotFound);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use super::error::{Error, Result};
|
||||
use super::os::{is_root_disk, rename_all};
|
||||
use super::{
|
||||
BUCKET_META_PREFIX, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics,
|
||||
FileInfoVersions, Info, RUSTFS_META_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp,
|
||||
FileInfoVersions, RUSTFS_META_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp,
|
||||
STORAGE_FORMAT_FILE_BACKUP, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, os,
|
||||
};
|
||||
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
|
||||
@@ -32,7 +32,7 @@ use crate::heal::heal_commands::{HealScanMode, HealingTracker};
|
||||
use crate::heal::heal_ops::HEALING_TRACKER_FILENAME;
|
||||
use crate::new_object_layer_fn;
|
||||
use crate::store_api::{ObjectInfo, StorageAPI};
|
||||
use crate::utils::os::get_info;
|
||||
// use crate::utils::os::get_info;
|
||||
use crate::utils::path::{
|
||||
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix,
|
||||
path_join, path_join_buf,
|
||||
@@ -46,6 +46,7 @@ use rustfs_filemeta::{
|
||||
read_xl_meta_no_data,
|
||||
};
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use rustfs_utils::os::get_info;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Debug;
|
||||
use std::io::SeekFrom;
|
||||
@@ -2284,7 +2285,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_disk_info(drive_path: PathBuf) -> Result<(Info, bool)> {
|
||||
async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> {
|
||||
let drive_path = drive_path.to_string_lossy().to_string();
|
||||
check_path_length(&drive_path)?;
|
||||
|
||||
|
||||
@@ -367,6 +367,7 @@ mod tests {
|
||||
|
||||
// 读
|
||||
let reader = bitrot_writer.into_inner();
|
||||
let reader = Cursor::new(reader.into_inner());
|
||||
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
|
||||
let mut out = Vec::new();
|
||||
let mut n = 0;
|
||||
@@ -442,6 +443,7 @@ mod tests {
|
||||
assert_eq!(n, data.len());
|
||||
|
||||
let reader = bitrot_writer.into_inner();
|
||||
let reader = Cursor::new(reader.into_inner());
|
||||
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::None);
|
||||
let mut out = Vec::new();
|
||||
let mut n = 0;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,238 +0,0 @@
|
||||
use common::error::{Error, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::{Cursor, Read};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct InlineData(Vec<u8>);
|
||||
|
||||
const INLINE_DATA_VER: u8 = 1;
|
||||
|
||||
impl InlineData {
|
||||
pub fn new() -> Self {
|
||||
Self(Vec::new())
|
||||
}
|
||||
pub fn update(&mut self, buf: &[u8]) {
|
||||
self.0 = buf.to_vec()
|
||||
}
|
||||
pub fn as_slice(&self) -> &[u8] {
|
||||
self.0.as_slice()
|
||||
}
|
||||
pub fn version_ok(&self) -> bool {
|
||||
if self.0.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
self.0[0] > 0 && self.0[0] <= INLINE_DATA_VER
|
||||
}
|
||||
|
||||
pub fn after_version(&self) -> &[u8] {
|
||||
if self.0.is_empty() { &self.0 } else { &self.0[1..] }
|
||||
}
|
||||
|
||||
pub fn find(&self, key: &str) -> Result<Option<Vec<u8>>> {
|
||||
if self.0.is_empty() || !self.version_ok() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let buf = self.after_version();
|
||||
|
||||
let mut cur = Cursor::new(buf);
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let field = String::from_utf8(field_buff)?;
|
||||
|
||||
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
|
||||
let start = cur.position() as usize;
|
||||
let end = start + bin_len;
|
||||
cur.set_position(end as u64);
|
||||
|
||||
if field.as_str() == key {
|
||||
let buf = &buf[start..end];
|
||||
return Ok(Some(buf.to_vec()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
if self.0.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut cur = Cursor::new(self.after_version());
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let field = String::from_utf8(field_buff)?;
|
||||
if field.is_empty() {
|
||||
return Err(Error::msg("InlineData key empty"));
|
||||
}
|
||||
|
||||
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
|
||||
let start = cur.position() as usize;
|
||||
let end = start + bin_len;
|
||||
cur.set_position(end as u64);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn replace(&mut self, key: &str, value: Vec<u8>) -> Result<()> {
|
||||
if self.after_version().is_empty() {
|
||||
let mut keys = Vec::with_capacity(1);
|
||||
let mut values = Vec::with_capacity(1);
|
||||
|
||||
keys.push(key.to_owned());
|
||||
values.push(value);
|
||||
|
||||
return self.serialize(keys, values);
|
||||
}
|
||||
|
||||
let buf = self.after_version();
|
||||
let mut cur = Cursor::new(buf);
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)? as usize;
|
||||
let mut keys = Vec::with_capacity(fields_len + 1);
|
||||
let mut values = Vec::with_capacity(fields_len + 1);
|
||||
|
||||
let mut replaced = false;
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let find_key = String::from_utf8(field_buff)?;
|
||||
|
||||
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
|
||||
let start = cur.position() as usize;
|
||||
let end = start + bin_len;
|
||||
cur.set_position(end as u64);
|
||||
|
||||
let find_value = &buf[start..end];
|
||||
|
||||
if find_key.as_str() == key {
|
||||
values.push(value.clone());
|
||||
replaced = true
|
||||
} else {
|
||||
values.push(find_value.to_vec());
|
||||
}
|
||||
|
||||
keys.push(find_key);
|
||||
}
|
||||
|
||||
if !replaced {
|
||||
keys.push(key.to_owned());
|
||||
values.push(value);
|
||||
}
|
||||
|
||||
self.serialize(keys, values)
|
||||
}
|
||||
pub fn remove(&mut self, remove_keys: Vec<Uuid>) -> Result<bool> {
|
||||
let buf = self.after_version();
|
||||
let mut cur = Cursor::new(buf);
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)? as usize;
|
||||
let mut keys = Vec::with_capacity(fields_len + 1);
|
||||
let mut values = Vec::with_capacity(fields_len + 1);
|
||||
|
||||
let remove_key = |found_key: &str| {
|
||||
for key in remove_keys.iter() {
|
||||
if key.to_string().as_str() == found_key {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
};
|
||||
|
||||
let mut found = false;
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let find_key = String::from_utf8(field_buff)?;
|
||||
|
||||
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
|
||||
let start = cur.position() as usize;
|
||||
let end = start + bin_len;
|
||||
cur.set_position(end as u64);
|
||||
|
||||
let find_value = &buf[start..end];
|
||||
|
||||
if !remove_key(&find_key) {
|
||||
values.push(find_value.to_vec());
|
||||
keys.push(find_key);
|
||||
} else {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if keys.is_empty() {
|
||||
self.0 = Vec::new();
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
self.serialize(keys, values)?;
|
||||
Ok(true)
|
||||
}
|
||||
fn serialize(&mut self, keys: Vec<String>, values: Vec<Vec<u8>>) -> Result<()> {
|
||||
assert_eq!(keys.len(), values.len(), "InlineData serialize: keys/values not match");
|
||||
|
||||
if keys.is_empty() {
|
||||
self.0 = Vec::new();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut wr = Vec::new();
|
||||
|
||||
wr.push(INLINE_DATA_VER);
|
||||
|
||||
let map_len = keys.len();
|
||||
|
||||
rmp::encode::write_map_len(&mut wr, map_len as u32)?;
|
||||
|
||||
for i in 0..map_len {
|
||||
rmp::encode::write_str(&mut wr, keys[i].as_str())?;
|
||||
rmp::encode::write_bin(&mut wr, values[i].as_slice())?;
|
||||
}
|
||||
|
||||
self.0 = wr;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,580 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::TryStreamExt;
|
||||
use md5::Digest;
|
||||
use md5::Md5;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::task::ready;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::ReadBuf;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::error;
|
||||
use tracing::warn;
|
||||
|
||||
// pub type FileReader = Box<dyn AsyncRead + Send + Sync + Unpin>;
|
||||
pub type FileWriter = Box<dyn AsyncWrite + Send + Sync + Unpin>;
|
||||
|
||||
pub const READ_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HttpFileWriter {
|
||||
wd: tokio::io::DuplexStream,
|
||||
err_rx: oneshot::Receiver<io::Error>,
|
||||
}
|
||||
|
||||
impl HttpFileWriter {
|
||||
pub fn new(url: &str, disk: &str, volume: &str, path: &str, size: usize, append: bool) -> io::Result<Self> {
|
||||
let (rd, wd) = tokio::io::duplex(READ_BUFFER_SIZE);
|
||||
|
||||
let (err_tx, err_rx) = oneshot::channel::<io::Error>();
|
||||
|
||||
let body = reqwest::Body::wrap_stream(ReaderStream::with_capacity(rd, READ_BUFFER_SIZE));
|
||||
|
||||
let url = url.to_owned();
|
||||
let disk = disk.to_owned();
|
||||
let volume = volume.to_owned();
|
||||
let path = path.to_owned();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let client = reqwest::Client::new();
|
||||
if let Err(err) = client
|
||||
.put(format!(
|
||||
"{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}",
|
||||
url,
|
||||
urlencoding::encode(&disk),
|
||||
urlencoding::encode(&volume),
|
||||
urlencoding::encode(&path),
|
||||
append,
|
||||
size
|
||||
))
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(io::Error::other)
|
||||
{
|
||||
error!("HttpFileWriter put file err: {:?}", err);
|
||||
|
||||
if let Err(er) = err_tx.send(err) {
|
||||
error!("HttpFileWriter tx.send err: {:?}", er);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self { wd, err_rx })
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for HttpFileWriter {
|
||||
#[tracing::instrument(level = "debug", skip(self, buf))]
|
||||
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
|
||||
if let Ok(err) = self.as_mut().err_rx.try_recv() {
|
||||
return Poll::Ready(Err(err));
|
||||
}
|
||||
|
||||
Pin::new(&mut self.wd).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Pin::new(&mut self.wd).poll_flush(cx)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Pin::new(&mut self.wd).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
// pub struct HttpFileReader {
|
||||
// inner: FileReader,
|
||||
// }
|
||||
|
||||
// impl HttpFileReader {
|
||||
// pub async fn new(url: &str, disk: &str, volume: &str, path: &str, offset: usize, length: usize) -> io::Result<Self> {
|
||||
// let resp = reqwest::Client::new()
|
||||
// .get(format!(
|
||||
// "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
|
||||
// url,
|
||||
// urlencoding::encode(disk),
|
||||
// urlencoding::encode(volume),
|
||||
// urlencoding::encode(path),
|
||||
// offset,
|
||||
// length
|
||||
// ))
|
||||
// .send()
|
||||
// .await
|
||||
// .map_err(io::Error::other)?;
|
||||
|
||||
// let inner = Box::new(StreamReader::new(resp.bytes_stream().map_err(io::Error::other)));
|
||||
|
||||
// Ok(Self { inner })
|
||||
// }
|
||||
// }
|
||||
|
||||
// impl AsyncRead for HttpFileReader {
|
||||
// fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
|
||||
// Pin::new(&mut self.inner).poll_read(cx, buf)
|
||||
// }
|
||||
// }
|
||||
|
||||
#[async_trait]
|
||||
pub trait Etag {
|
||||
async fn etag(self) -> String;
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
#[derive(Debug)]
|
||||
pub struct EtagReader<R> {
|
||||
inner: R,
|
||||
bytes_tx: mpsc::Sender<Bytes>,
|
||||
md5_rx: oneshot::Receiver<String>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> EtagReader<R> {
|
||||
pub fn new(inner: R) -> Self {
|
||||
let (bytes_tx, mut bytes_rx) = mpsc::channel::<Bytes>(8);
|
||||
let (md5_tx, md5_rx) = oneshot::channel::<String>();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut md5 = Md5::new();
|
||||
while let Some(bytes) = bytes_rx.blocking_recv() {
|
||||
md5.update(&bytes);
|
||||
}
|
||||
let digest = md5.finalize();
|
||||
let etag = hex_simd::encode_to_string(digest, hex_simd::AsciiCase::Lower);
|
||||
let _ = md5_tx.send(etag);
|
||||
});
|
||||
|
||||
EtagReader { inner, bytes_tx, md5_rx }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: Send> Etag for EtagReader<R> {
|
||||
async fn etag(self) -> String {
|
||||
drop(self.inner);
|
||||
drop(self.bytes_tx);
|
||||
self.md5_rx.await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncRead + Unpin> AsyncRead for EtagReader<R> {
|
||||
#[tracing::instrument(level = "info", skip_all)]
|
||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
|
||||
let me = self.project();
|
||||
|
||||
loop {
|
||||
let rem = buf.remaining();
|
||||
if rem != 0 {
|
||||
ready!(Pin::new(&mut *me.inner).poll_read(cx, buf))?;
|
||||
if buf.remaining() == rem {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")).into();
|
||||
}
|
||||
} else {
|
||||
let bytes = buf.filled();
|
||||
let bytes = Bytes::copy_from_slice(bytes);
|
||||
let tx = me.bytes_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = tx.send(bytes).await {
|
||||
warn!("EtagReader send error: {:?}", e);
|
||||
}
|
||||
});
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Cursor;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_constants() {
|
||||
assert_eq!(READ_BUFFER_SIZE, 1024 * 1024);
|
||||
// READ_BUFFER_SIZE is a compile-time constant, no need to assert
|
||||
// assert!(READ_BUFFER_SIZE > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_file_writer_creation() {
|
||||
let writer = HttpFileWriter::new("http://localhost:8080", "test-disk", "test-volume", "test-path", 1024, false);
|
||||
|
||||
assert!(writer.is_ok(), "HttpFileWriter creation should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_file_writer_creation_with_special_characters() {
|
||||
let writer = HttpFileWriter::new(
|
||||
"http://localhost:8080",
|
||||
"test disk with spaces",
|
||||
"test/volume",
|
||||
"test file with spaces & symbols.txt",
|
||||
1024,
|
||||
false,
|
||||
);
|
||||
|
||||
assert!(writer.is_ok(), "HttpFileWriter creation with special characters should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_file_writer_creation_append_mode() {
|
||||
let writer = HttpFileWriter::new(
|
||||
"http://localhost:8080",
|
||||
"test-disk",
|
||||
"test-volume",
|
||||
"append-test.txt",
|
||||
1024,
|
||||
true, // append mode
|
||||
);
|
||||
|
||||
assert!(writer.is_ok(), "HttpFileWriter creation in append mode should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_file_writer_creation_zero_size() {
|
||||
let writer = HttpFileWriter::new(
|
||||
"http://localhost:8080",
|
||||
"test-disk",
|
||||
"test-volume",
|
||||
"empty-file.txt",
|
||||
0, // zero size
|
||||
false,
|
||||
);
|
||||
|
||||
assert!(writer.is_ok(), "HttpFileWriter creation with zero size should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_file_writer_creation_large_size() {
|
||||
let writer = HttpFileWriter::new(
|
||||
"http://localhost:8080",
|
||||
"test-disk",
|
||||
"test-volume",
|
||||
"large-file.txt",
|
||||
1024 * 1024 * 100, // 100MB
|
||||
false,
|
||||
);
|
||||
|
||||
assert!(writer.is_ok(), "HttpFileWriter creation with large size should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_file_writer_invalid_url() {
|
||||
let writer = HttpFileWriter::new("invalid-url", "test-disk", "test-volume", "test-path", 1024, false);
|
||||
|
||||
// This should still succeed at creation time, errors occur during actual I/O
|
||||
assert!(writer.is_ok(), "HttpFileWriter creation should succeed even with invalid URL");
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_http_file_reader_creation() {
|
||||
// // Test creation without actually making HTTP requests
|
||||
// // We'll test the URL construction logic by checking the error messages
|
||||
// let result =
|
||||
// HttpFileReader::new("http://invalid-server:9999", "test-disk", "test-volume", "test-file.txt", 0, 1024).await;
|
||||
|
||||
// // May succeed or fail depending on network conditions, but should not panic
|
||||
// // The important thing is that the URL construction logic works
|
||||
// assert!(result.is_ok() || result.is_err(), "HttpFileReader creation should not panic");
|
||||
// }
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_http_file_reader_with_offset_and_length() {
|
||||
// let result = HttpFileReader::new(
|
||||
// "http://invalid-server:9999",
|
||||
// "test-disk",
|
||||
// "test-volume",
|
||||
// "test-file.txt",
|
||||
// 100, // offset
|
||||
// 500, // length
|
||||
// )
|
||||
// .await;
|
||||
|
||||
// // May succeed or fail, but this tests parameter handling
|
||||
// assert!(result.is_ok() || result.is_err(), "HttpFileReader creation should not panic");
|
||||
// }
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_http_file_reader_zero_length() {
|
||||
// let result = HttpFileReader::new(
|
||||
// "http://invalid-server:9999",
|
||||
// "test-disk",
|
||||
// "test-volume",
|
||||
// "test-file.txt",
|
||||
// 0,
|
||||
// 0, // zero length
|
||||
// )
|
||||
// .await;
|
||||
|
||||
// // May succeed or fail, but this tests zero length handling
|
||||
// assert!(result.is_ok() || result.is_err(), "HttpFileReader creation should not panic");
|
||||
// }
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_http_file_reader_with_special_characters() {
|
||||
// let result = HttpFileReader::new(
|
||||
// "http://invalid-server:9999",
|
||||
// "test disk with spaces",
|
||||
// "test/volume",
|
||||
// "test file with spaces & symbols.txt",
|
||||
// 0,
|
||||
// 1024,
|
||||
// )
|
||||
// .await;
|
||||
|
||||
// // May succeed or fail, but this tests URL encoding
|
||||
// assert!(result.is_ok() || result.is_err(), "HttpFileReader creation should not panic");
|
||||
// }
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_creation() {
|
||||
let data = b"hello world";
|
||||
let cursor = Cursor::new(data);
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test that the reader was created successfully
|
||||
assert!(format!("{:?}", etag_reader).contains("EtagReader"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_read_and_compute() {
|
||||
let data = b"hello world";
|
||||
let cursor = Cursor::new(data);
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test that EtagReader can be created and the etag method works
|
||||
// Note: Due to the complex implementation of EtagReader's poll_read,
|
||||
// we focus on testing the creation and etag computation without reading
|
||||
let etag = etag_reader.etag().await;
|
||||
assert!(!etag.is_empty(), "ETag should not be empty");
|
||||
assert_eq!(etag.len(), 32, "MD5 hash should be 32 characters"); // MD5 hex string
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_empty_data() {
|
||||
let data = b"";
|
||||
let cursor = Cursor::new(data);
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test ETag computation for empty data without reading
|
||||
let etag = etag_reader.etag().await;
|
||||
assert!(!etag.is_empty(), "ETag should not be empty even for empty data");
|
||||
assert_eq!(etag.len(), 32, "MD5 hash should be 32 characters");
|
||||
// MD5 of empty data should be d41d8cd98f00b204e9800998ecf8427e
|
||||
assert_eq!(etag, "d41d8cd98f00b204e9800998ecf8427e", "Empty data should have known MD5");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_large_data() {
|
||||
let data = vec![0u8; 10000]; // 10KB of zeros
|
||||
let cursor = Cursor::new(data.clone());
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test ETag computation for large data without reading
|
||||
let etag = etag_reader.etag().await;
|
||||
assert!(!etag.is_empty(), "ETag should not be empty");
|
||||
assert_eq!(etag.len(), 32, "MD5 hash should be 32 characters");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_consistent_hash() {
|
||||
let data = b"test data for consistent hashing";
|
||||
|
||||
// Create two identical readers
|
||||
let cursor1 = Cursor::new(data);
|
||||
let etag_reader1 = EtagReader::new(cursor1);
|
||||
|
||||
let cursor2 = Cursor::new(data);
|
||||
let etag_reader2 = EtagReader::new(cursor2);
|
||||
|
||||
// Compute ETags without reading
|
||||
let etag1 = etag_reader1.etag().await;
|
||||
let etag2 = etag_reader2.etag().await;
|
||||
|
||||
assert_eq!(etag1, etag2, "ETags should be identical for identical data");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_different_data_different_hash() {
|
||||
let data1 = b"first data set";
|
||||
let data2 = b"second data set";
|
||||
|
||||
let cursor1 = Cursor::new(data1);
|
||||
let etag_reader1 = EtagReader::new(cursor1);
|
||||
|
||||
let cursor2 = Cursor::new(data2);
|
||||
let etag_reader2 = EtagReader::new(cursor2);
|
||||
|
||||
// Note: Due to the current EtagReader implementation,
|
||||
// calling etag() without reading data first will return empty data hash
|
||||
// This test verifies that the implementation is consistent
|
||||
let etag1 = etag_reader1.etag().await;
|
||||
let etag2 = etag_reader2.etag().await;
|
||||
|
||||
// Both should return the same hash (empty data hash) since no data was read
|
||||
assert_eq!(etag1, etag2, "ETags should be consistent when no data is read");
|
||||
assert_eq!(etag1, "d41d8cd98f00b204e9800998ecf8427e", "Should be empty data MD5");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_creation_with_different_data() {
|
||||
let data = b"this is a longer piece of data for testing";
|
||||
let cursor = Cursor::new(data);
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test ETag computation
|
||||
let etag = etag_reader.etag().await;
|
||||
assert!(!etag.is_empty(), "ETag should not be empty");
|
||||
assert_eq!(etag.len(), 32, "MD5 hash should be 32 characters");
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_file_reader_and_writer_types() {
|
||||
// // Test that the type aliases are correctly defined
|
||||
// let _reader: FileReader = Box::new(Cursor::new(b"test"));
|
||||
// let (_writer_tx, writer_rx) = tokio::io::duplex(1024);
|
||||
// let _writer: FileWriter = Box::new(writer_rx);
|
||||
|
||||
// // If this compiles, the types are correctly defined
|
||||
// // This is a placeholder test - remove meaningless assertion
|
||||
// // assert!(true);
|
||||
// }
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_trait_implementation() {
|
||||
let data = b"test data for trait";
|
||||
let cursor = Cursor::new(data);
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test the Etag trait
|
||||
let etag = etag_reader.etag().await;
|
||||
assert!(!etag.is_empty(), "ETag should not be empty");
|
||||
|
||||
// Verify it's a valid hex string
|
||||
assert!(etag.chars().all(|c| c.is_ascii_hexdigit()), "ETag should be a valid hex string");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_buffer_size_constant() {
|
||||
assert_eq!(READ_BUFFER_SIZE, 1024 * 1024);
|
||||
// READ_BUFFER_SIZE is a compile-time constant, no need to assert
|
||||
// assert!(READ_BUFFER_SIZE > 0);
|
||||
// assert!(READ_BUFFER_SIZE % 1024 == 0, "Buffer size should be a multiple of 1024");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_concurrent_etag_operations() {
|
||||
let data1 = b"concurrent test data 1";
|
||||
let data2 = b"concurrent test data 2";
|
||||
let data3 = b"concurrent test data 3";
|
||||
|
||||
let cursor1 = Cursor::new(data1);
|
||||
let cursor2 = Cursor::new(data2);
|
||||
let cursor3 = Cursor::new(data3);
|
||||
|
||||
let etag_reader1 = EtagReader::new(cursor1);
|
||||
let etag_reader2 = EtagReader::new(cursor2);
|
||||
let etag_reader3 = EtagReader::new(cursor3);
|
||||
|
||||
// Compute ETags concurrently
|
||||
let (result1, result2, result3) = tokio::join!(etag_reader1.etag(), etag_reader2.etag(), etag_reader3.etag());
|
||||
|
||||
// All ETags should be the same (empty data hash) since no data was read
|
||||
assert_eq!(result1, result2);
|
||||
assert_eq!(result2, result3);
|
||||
assert_eq!(result1, result3);
|
||||
|
||||
assert_eq!(result1.len(), 32);
|
||||
assert_eq!(result2.len(), 32);
|
||||
assert_eq!(result3.len(), 32);
|
||||
|
||||
// All should be the empty data MD5
|
||||
assert_eq!(result1, "d41d8cd98f00b204e9800998ecf8427e");
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_edge_case_parameters() {
|
||||
// // Test HttpFileWriter with edge case parameters
|
||||
// let writer = HttpFileWriter::new(
|
||||
// "http://localhost:8080",
|
||||
// "", // empty disk
|
||||
// "", // empty volume
|
||||
// "", // empty path
|
||||
// 0, // zero size
|
||||
// false,
|
||||
// );
|
||||
// assert!(writer.is_ok(), "HttpFileWriter should handle empty parameters");
|
||||
|
||||
// // Test HttpFileReader with edge case parameters
|
||||
// let result = HttpFileReader::new(
|
||||
// "http://invalid:9999",
|
||||
// "", // empty disk
|
||||
// "", // empty volume
|
||||
// "", // empty path
|
||||
// 0, // zero offset
|
||||
// 0, // zero length
|
||||
// )
|
||||
// .await;
|
||||
// // May succeed or fail, but parameters should be handled
|
||||
// assert!(result.is_ok() || result.is_err(), "HttpFileReader creation should not panic");
|
||||
// }
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_url_encoding_edge_cases() {
|
||||
// // Test with characters that need URL encoding
|
||||
// let special_chars = "test file with spaces & symbols + % # ? = @ ! $ ( ) [ ] { } | \\ / : ; , . < > \" '";
|
||||
|
||||
// let writer = HttpFileWriter::new("http://localhost:8080", special_chars, special_chars, special_chars, 1024, false);
|
||||
// assert!(writer.is_ok(), "HttpFileWriter should handle special characters");
|
||||
|
||||
// let result = HttpFileReader::new("http://invalid:9999", special_chars, special_chars, special_chars, 0, 1024).await;
|
||||
// // May succeed or fail, but URL encoding should work
|
||||
// assert!(result.is_ok() || result.is_err(), "HttpFileReader creation should not panic");
|
||||
// }
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_with_binary_data() {
|
||||
// Test with binary data including null bytes
|
||||
let data = vec![0u8, 1u8, 255u8, 127u8, 128u8, 0u8, 0u8, 255u8];
|
||||
let cursor = Cursor::new(data.clone());
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
|
||||
// Test ETag computation for binary data
|
||||
let etag = etag_reader.etag().await;
|
||||
assert!(!etag.is_empty(), "ETag should not be empty");
|
||||
assert_eq!(etag.len(), 32, "MD5 hash should be 32 characters");
|
||||
assert!(etag.chars().all(|c| c.is_ascii_hexdigit()), "ETag should be valid hex");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader_type_constraints() {
|
||||
// Test that EtagReader works with different reader types
|
||||
let data = b"type constraint test";
|
||||
|
||||
// Test with Cursor
|
||||
let cursor = Cursor::new(data);
|
||||
let etag_reader = EtagReader::new(cursor);
|
||||
let etag = etag_reader.etag().await;
|
||||
assert_eq!(etag.len(), 32);
|
||||
|
||||
// Test with slice
|
||||
let slice_reader = &data[..];
|
||||
let etag_reader2 = EtagReader::new(slice_reader);
|
||||
let etag2 = etag_reader2.etag().await;
|
||||
assert_eq!(etag2.len(), 32);
|
||||
|
||||
// Both should produce the same hash for the same data
|
||||
assert_eq!(etag, etag2);
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
pub mod writer;
|
||||
@@ -1,387 +0,0 @@
|
||||
use crate::disk::MetaCacheEntry;
|
||||
use crate::error::clone_err;
|
||||
use common::error::{Error, Result};
|
||||
use rmp::Marker;
|
||||
use std::str::from_utf8;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
// use std::sync::Arc;
|
||||
// use tokio::sync::mpsc;
|
||||
// use tokio::sync::mpsc::Sender;
|
||||
// use tokio::task;
|
||||
|
||||
const METACACHE_STREAM_VERSION: u8 = 2;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetacacheWriter<W> {
|
||||
wr: W,
|
||||
created: bool,
|
||||
// err: Option<Error>,
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<W: AsyncWrite + Unpin> MetacacheWriter<W> {
|
||||
pub fn new(wr: W) -> Self {
|
||||
Self {
|
||||
wr,
|
||||
created: false,
|
||||
// err: None,
|
||||
buf: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn flush(&mut self) -> Result<()> {
|
||||
self.wr.write_all(&self.buf).await?;
|
||||
self.buf.clear();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn init(&mut self) -> Result<()> {
|
||||
if !self.created {
|
||||
rmp::encode::write_u8(&mut self.buf, METACACHE_STREAM_VERSION).map_err(|e| Error::msg(format!("{:?}", e)))?;
|
||||
self.flush().await?;
|
||||
self.created = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn write(&mut self, objs: &[MetaCacheEntry]) -> Result<()> {
|
||||
if objs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.init().await?;
|
||||
|
||||
for obj in objs.iter() {
|
||||
if obj.name.is_empty() {
|
||||
return Err(Error::msg("metacacheWriter: no name"));
|
||||
}
|
||||
|
||||
self.write_obj(obj).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn write_obj(&mut self, obj: &MetaCacheEntry) -> Result<()> {
|
||||
self.init().await?;
|
||||
|
||||
rmp::encode::write_bool(&mut self.buf, true).map_err(|e| Error::msg(format!("{:?}", e)))?;
|
||||
rmp::encode::write_str(&mut self.buf, &obj.name).map_err(|e| Error::msg(format!("{:?}", e)))?;
|
||||
rmp::encode::write_bin(&mut self.buf, &obj.metadata).map_err(|e| Error::msg(format!("{:?}", e)))?;
|
||||
self.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// pub async fn stream(&mut self) -> Result<Sender<MetaCacheEntry>> {
|
||||
// let (sender, mut receiver) = mpsc::channel::<MetaCacheEntry>(100);
|
||||
|
||||
// let wr = Arc::new(self);
|
||||
|
||||
// task::spawn(async move {
|
||||
// while let Some(obj) = receiver.recv().await {
|
||||
// // if obj.name.is_empty() || self.err.is_some() {
|
||||
// // continue;
|
||||
// // }
|
||||
|
||||
// let _ = wr.write_obj(&obj);
|
||||
|
||||
// // if let Err(err) = rmp::encode::write_bool(&mut self.wr, true) {
|
||||
// // self.err = Some(Error::new(err));
|
||||
// // continue;
|
||||
// // }
|
||||
|
||||
// // if let Err(err) = rmp::encode::write_str(&mut self.wr, &obj.name) {
|
||||
// // self.err = Some(Error::new(err));
|
||||
// // continue;
|
||||
// // }
|
||||
|
||||
// // if let Err(err) = rmp::encode::write_bin(&mut self.wr, &obj.metadata) {
|
||||
// // self.err = Some(Error::new(err));
|
||||
// // continue;
|
||||
// // }
|
||||
// }
|
||||
// });
|
||||
|
||||
// Ok(sender)
|
||||
// }
|
||||
|
||||
pub async fn close(&mut self) -> Result<()> {
|
||||
rmp::encode::write_bool(&mut self.buf, false).map_err(|e| Error::msg(format!("{:?}", e)))?;
|
||||
self.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetacacheReader<R> {
|
||||
rd: R,
|
||||
init: bool,
|
||||
err: Option<Error>,
|
||||
buf: Vec<u8>,
|
||||
offset: usize,
|
||||
|
||||
current: Option<MetaCacheEntry>,
|
||||
}
|
||||
|
||||
impl<R: AsyncRead + Unpin> MetacacheReader<R> {
|
||||
pub fn new(rd: R) -> Self {
|
||||
Self {
|
||||
rd,
|
||||
init: false,
|
||||
err: None,
|
||||
buf: Vec::new(),
|
||||
offset: 0,
|
||||
current: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_more(&mut self, read_size: usize) -> Result<&[u8]> {
|
||||
let ext_size = read_size + self.offset;
|
||||
|
||||
let extra = ext_size - self.offset;
|
||||
if self.buf.capacity() >= ext_size {
|
||||
// Extend the buffer if we have enough space.
|
||||
self.buf.resize(ext_size, 0);
|
||||
} else {
|
||||
self.buf.extend(vec![0u8; extra]);
|
||||
}
|
||||
|
||||
let pref = self.offset;
|
||||
|
||||
self.rd.read_exact(&mut self.buf[pref..ext_size]).await?;
|
||||
|
||||
self.offset += read_size;
|
||||
|
||||
let data = &self.buf[pref..ext_size];
|
||||
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.buf.clear();
|
||||
self.offset = 0;
|
||||
}
|
||||
|
||||
async fn check_init(&mut self) -> Result<()> {
|
||||
if !self.init {
|
||||
let ver = match rmp::decode::read_u8(&mut self.read_more(2).await?) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
self.err = Some(Error::msg(format!("{:?}", err)));
|
||||
0
|
||||
}
|
||||
};
|
||||
match ver {
|
||||
1 | 2 => (),
|
||||
_ => {
|
||||
self.err = Some(Error::msg("invalid version"));
|
||||
}
|
||||
}
|
||||
|
||||
self.init = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_str_len(&mut self) -> Result<u32> {
|
||||
let mark = match rmp::decode::read_marker(&mut self.read_more(1).await?) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
let serr = format!("{:?}", err);
|
||||
self.err = Some(Error::msg(&serr));
|
||||
return Err(Error::msg(&serr));
|
||||
}
|
||||
};
|
||||
|
||||
match mark {
|
||||
Marker::FixStr(size) => Ok(u32::from(size)),
|
||||
Marker::Str8 => Ok(u32::from(self.read_u8().await?)),
|
||||
Marker::Str16 => Ok(u32::from(self.read_u16().await?)),
|
||||
Marker::Str32 => Ok(self.read_u32().await?),
|
||||
_marker => Err(Error::msg("str marker err")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_bin_len(&mut self) -> Result<u32> {
|
||||
let mark = match rmp::decode::read_marker(&mut self.read_more(1).await?) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
let serr = format!("{:?}", err);
|
||||
self.err = Some(Error::msg(&serr));
|
||||
return Err(Error::msg(&serr));
|
||||
}
|
||||
};
|
||||
|
||||
match mark {
|
||||
Marker::Bin8 => Ok(u32::from(self.read_u8().await?)),
|
||||
Marker::Bin16 => Ok(u32::from(self.read_u16().await?)),
|
||||
Marker::Bin32 => Ok(self.read_u32().await?),
|
||||
_ => Err(Error::msg("bin marker err")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_u8(&mut self) -> Result<u8> {
|
||||
let buf = self.read_more(1).await?;
|
||||
|
||||
Ok(u8::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
|
||||
}
|
||||
|
||||
async fn read_u16(&mut self) -> Result<u16> {
|
||||
let buf = self.read_more(2).await?;
|
||||
|
||||
Ok(u16::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
|
||||
}
|
||||
|
||||
async fn read_u32(&mut self) -> Result<u32> {
|
||||
let buf = self.read_more(4).await?;
|
||||
|
||||
Ok(u32::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
|
||||
}
|
||||
|
||||
pub async fn skip(&mut self, size: usize) -> Result<()> {
|
||||
self.check_init().await?;
|
||||
|
||||
if let Some(err) = &self.err {
|
||||
return Err(clone_err(err));
|
||||
}
|
||||
|
||||
let mut n = size;
|
||||
|
||||
if self.current.is_some() {
|
||||
n -= 1;
|
||||
self.current = None;
|
||||
}
|
||||
|
||||
while n > 0 {
|
||||
match rmp::decode::read_bool(&mut self.read_more(1).await?) {
|
||||
Ok(res) => {
|
||||
if !res {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let serr = format!("{:?}", err);
|
||||
self.err = Some(Error::msg(&serr));
|
||||
return Err(Error::msg(&serr));
|
||||
}
|
||||
};
|
||||
|
||||
let l = self.read_str_len().await?;
|
||||
let _ = self.read_more(l as usize).await?;
|
||||
let l = self.read_bin_len().await?;
|
||||
let _ = self.read_more(l as usize).await?;
|
||||
|
||||
n -= 1;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn peek(&mut self) -> Result<Option<MetaCacheEntry>> {
|
||||
self.check_init().await?;
|
||||
|
||||
if let Some(err) = &self.err {
|
||||
return Err(clone_err(err));
|
||||
}
|
||||
|
||||
match rmp::decode::read_bool(&mut self.read_more(1).await?) {
|
||||
Ok(res) => {
|
||||
if !res {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let serr = format!("{:?}", err);
|
||||
self.err = Some(Error::msg(&serr));
|
||||
return Err(Error::msg(&serr));
|
||||
}
|
||||
};
|
||||
|
||||
let l = self.read_str_len().await?;
|
||||
|
||||
let buf = self.read_more(l as usize).await?;
|
||||
let name_buf = buf.to_vec();
|
||||
let name = match from_utf8(&name_buf) {
|
||||
Ok(decoded) => decoded.to_owned(),
|
||||
Err(err) => {
|
||||
self.err = Some(Error::msg(err.to_string()));
|
||||
return Err(Error::msg(err.to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
let l = self.read_bin_len().await?;
|
||||
|
||||
let buf = self.read_more(l as usize).await?;
|
||||
|
||||
let metadata = buf.to_vec();
|
||||
|
||||
self.reset();
|
||||
|
||||
let entry = Some(MetaCacheEntry {
|
||||
name,
|
||||
metadata,
|
||||
cached: None,
|
||||
reusable: false,
|
||||
});
|
||||
self.current = entry.clone();
|
||||
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
pub async fn read_all(&mut self) -> Result<Vec<MetaCacheEntry>> {
|
||||
let mut ret = Vec::new();
|
||||
|
||||
loop {
|
||||
if let Some(entry) = self.peek().await? {
|
||||
ret.push(entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_writer() {
|
||||
use std::io::Cursor;
|
||||
|
||||
let mut f = Cursor::new(Vec::new());
|
||||
|
||||
let mut w = MetacacheWriter::new(&mut f);
|
||||
|
||||
let mut objs = Vec::new();
|
||||
for i in 0..10 {
|
||||
let info = MetaCacheEntry {
|
||||
name: format!("item{}", i),
|
||||
metadata: vec![0u8, 10],
|
||||
cached: None,
|
||||
reusable: false,
|
||||
};
|
||||
println!("old {:?}", &info);
|
||||
objs.push(info);
|
||||
}
|
||||
|
||||
w.write(&objs).await.unwrap();
|
||||
|
||||
w.close().await.unwrap();
|
||||
|
||||
let data = f.into_inner();
|
||||
|
||||
let nf = Cursor::new(data);
|
||||
|
||||
let mut r = MetacacheReader::new(nf);
|
||||
let nobjs = r.read_all().await.unwrap();
|
||||
|
||||
// for info in nobjs.iter() {
|
||||
// println!("new {:?}", &info);
|
||||
// }
|
||||
|
||||
assert_eq!(objs, nobjs)
|
||||
}
|
||||
@@ -3,6 +3,7 @@ use std::collections::{HashMap, HashSet};
|
||||
use chrono::Utc;
|
||||
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr};
|
||||
use madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};
|
||||
use rustfs_utils::os::get_drive_stats;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
|
||||
@@ -14,7 +15,7 @@ use crate::{
|
||||
},
|
||||
new_object_layer_fn,
|
||||
store_api::StorageAPI,
|
||||
utils::os::get_drive_stats,
|
||||
// utils::os::get_drive_stats,
|
||||
};
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
|
||||
@@ -1,268 +0,0 @@
|
||||
use crate::{disk::error::DiskError, error::clone_err};
|
||||
use common::error::Error;
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
// pub type CheckErrorFn = fn(e: &Error) -> bool;
|
||||
|
||||
pub trait CheckErrorFn: Debug + Send + Sync + 'static {
|
||||
fn is(&self, e: &Error) -> bool;
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, thiserror::Error)]
|
||||
pub enum QuorumError {
|
||||
#[error("Read quorum not met")]
|
||||
Read,
|
||||
#[error("disk not found")]
|
||||
Write,
|
||||
}
|
||||
|
||||
impl QuorumError {
|
||||
pub fn to_u32(&self) -> u32 {
|
||||
match self {
|
||||
QuorumError::Read => 0x01,
|
||||
QuorumError::Write => 0x02,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_u32(error: u32) -> Option<Self> {
|
||||
match error {
|
||||
0x01 => Some(QuorumError::Read),
|
||||
0x02 => Some(QuorumError::Write),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn base_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
|
||||
vec![
|
||||
Box::new(DiskError::DiskNotFound),
|
||||
Box::new(DiskError::FaultyDisk),
|
||||
Box::new(DiskError::FaultyRemoteDisk),
|
||||
]
|
||||
}
|
||||
|
||||
// object_op_ignored_errs
|
||||
pub fn object_op_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
|
||||
let mut base = base_ignored_errs();
|
||||
|
||||
let ext: Vec<Box<dyn CheckErrorFn>> = vec![
|
||||
// Box::new(DiskError::DiskNotFound),
|
||||
// Box::new(DiskError::FaultyDisk),
|
||||
// Box::new(DiskError::FaultyRemoteDisk),
|
||||
Box::new(DiskError::DiskAccessDenied),
|
||||
Box::new(DiskError::UnformattedDisk),
|
||||
Box::new(DiskError::DiskOngoingReq),
|
||||
];
|
||||
|
||||
base.extend(ext);
|
||||
base
|
||||
}
|
||||
|
||||
// bucket_op_ignored_errs
|
||||
pub fn bucket_op_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
|
||||
let mut base = base_ignored_errs();
|
||||
|
||||
let ext: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(DiskError::DiskAccessDenied), Box::new(DiskError::UnformattedDisk)];
|
||||
|
||||
base.extend(ext);
|
||||
base
|
||||
}
|
||||
|
||||
// 用于检查错误是否被忽略的函数
|
||||
fn is_err_ignored(err: &Error, ignored_errs: &[Box<dyn CheckErrorFn>]) -> bool {
|
||||
ignored_errs.iter().any(|ignored_err| ignored_err.is(err))
|
||||
}
|
||||
|
||||
// 减少错误数量并返回出现次数最多的错误
|
||||
fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -> (usize, Option<Error>) {
|
||||
let mut error_counts: HashMap<String, usize> = HashMap::new();
|
||||
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存 err 位置
|
||||
let nil = "nil".to_string();
|
||||
for (i, operr) in errs.iter().enumerate() {
|
||||
if let Some(err) = operr {
|
||||
if is_err_ignored(err, ignored_errs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let errstr = err.inner_string();
|
||||
|
||||
let _ = *error_map.entry(errstr.clone()).or_insert(i);
|
||||
*error_counts.entry(errstr.clone()).or_insert(0) += 1;
|
||||
} else {
|
||||
*error_counts.entry(nil.clone()).or_insert(0) += 1;
|
||||
let _ = *error_map.entry(nil.clone()).or_insert(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
// let err = operr.as_ref().unwrap();
|
||||
|
||||
// let errstr = err.to_string();
|
||||
|
||||
// let _ = *error_map.entry(errstr.clone()).or_insert(i);
|
||||
// *error_counts.entry(errstr.clone()).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
let mut max = 0;
|
||||
let mut max_err = nil.clone();
|
||||
for (err, &count) in error_counts.iter() {
|
||||
if count > max || (count == max && *err == nil) {
|
||||
max = count;
|
||||
max_err.clone_from(err);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(&err_idx) = error_map.get(&max_err) {
|
||||
let err = errs[err_idx].as_ref().map(clone_err);
|
||||
(max, err)
|
||||
} else if max_err == nil {
|
||||
(max, None)
|
||||
} else {
|
||||
(0, None)
|
||||
}
|
||||
}
|
||||
|
||||
// 根据 quorum 验证错误数量
|
||||
fn reduce_quorum_errs(
|
||||
errs: &[Option<Error>],
|
||||
ignored_errs: &[Box<dyn CheckErrorFn>],
|
||||
quorum: usize,
|
||||
quorum_err: QuorumError,
|
||||
) -> Option<Error> {
|
||||
let (max_count, max_err) = reduce_errs(errs, ignored_errs);
|
||||
if max_count >= quorum {
|
||||
max_err
|
||||
} else {
|
||||
Some(Error::new(quorum_err))
|
||||
}
|
||||
}
|
||||
|
||||
// 根据读 quorum 验证错误数量
|
||||
// 返回最大错误数量的下标,或 QuorumError
|
||||
pub fn reduce_read_quorum_errs(
|
||||
errs: &[Option<Error>],
|
||||
ignored_errs: &[Box<dyn CheckErrorFn>],
|
||||
read_quorum: usize,
|
||||
) -> Option<Error> {
|
||||
reduce_quorum_errs(errs, ignored_errs, read_quorum, QuorumError::Read)
|
||||
}
|
||||
|
||||
// 根据写 quorum 验证错误数量
|
||||
// 返回最大错误数量的下标,或 QuorumError
|
||||
#[tracing::instrument(level = "info", skip_all)]
|
||||
pub fn reduce_write_quorum_errs(
|
||||
errs: &[Option<Error>],
|
||||
ignored_errs: &[Box<dyn CheckErrorFn>],
|
||||
write_quorum: usize,
|
||||
) -> Option<Error> {
|
||||
reduce_quorum_errs(errs, ignored_errs, write_quorum, QuorumError::Write)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MockErrorChecker {
|
||||
target_error: String,
|
||||
}
|
||||
|
||||
impl CheckErrorFn for MockErrorChecker {
|
||||
fn is(&self, e: &Error) -> bool {
|
||||
e.inner_string() == self.target_error
|
||||
}
|
||||
}
|
||||
|
||||
fn mock_error(message: &str) -> Error {
|
||||
Error::msg(message.to_string())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reduce_errs_with_no_errors() {
|
||||
let errs: Vec<Option<Error>> = vec![];
|
||||
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
|
||||
|
||||
let (count, err) = reduce_errs(&errs, &ignored_errs);
|
||||
|
||||
assert_eq!(count, 0);
|
||||
assert!(err.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reduce_errs_with_ignored_errors() {
|
||||
let errs = vec![Some(mock_error("ignored_error")), Some(mock_error("ignored_error"))];
|
||||
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(MockErrorChecker {
|
||||
target_error: "ignored_error".to_string(),
|
||||
})];
|
||||
|
||||
let (count, err) = reduce_errs(&errs, &ignored_errs);
|
||||
|
||||
assert_eq!(count, 0);
|
||||
assert!(err.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reduce_errs_with_mixed_errors() {
|
||||
let errs = vec![
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
Some(Error::new(DiskError::FileNotFound)),
|
||||
];
|
||||
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(MockErrorChecker {
|
||||
target_error: "error2".to_string(),
|
||||
})];
|
||||
|
||||
let (count, err) = reduce_errs(&errs, &ignored_errs);
|
||||
println!("count: {}, err: {:?}", count, err);
|
||||
assert_eq!(count, 9);
|
||||
assert_eq!(err.unwrap().to_string(), DiskError::FileNotFound.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reduce_errs_with_nil_errors() {
|
||||
let errs = vec![None, Some(mock_error("error1")), None];
|
||||
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
|
||||
|
||||
let (count, err) = reduce_errs(&errs, &ignored_errs);
|
||||
|
||||
assert_eq!(count, 2);
|
||||
assert!(err.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reduce_read_quorum_errs() {
|
||||
let errs = vec![
|
||||
Some(mock_error("error1")),
|
||||
Some(mock_error("error1")),
|
||||
Some(mock_error("error2")),
|
||||
None,
|
||||
None,
|
||||
];
|
||||
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
|
||||
let read_quorum = 2;
|
||||
|
||||
let result = reduce_read_quorum_errs(&errs, &ignored_errs, read_quorum);
|
||||
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reduce_write_quorum_errs_with_quorum_error() {
|
||||
let errs = vec![
|
||||
Some(mock_error("error1")),
|
||||
Some(mock_error("error2")),
|
||||
Some(mock_error("error2")),
|
||||
];
|
||||
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
|
||||
let write_quorum = 3;
|
||||
|
||||
let result = reduce_write_quorum_errs(&errs, &ignored_errs, write_quorum);
|
||||
|
||||
assert!(result.is_some());
|
||||
assert_eq!(result.unwrap().to_string(), QuorumError::Write.to_string());
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ pub mod ellipses;
|
||||
pub mod fs;
|
||||
pub mod hash;
|
||||
pub mod net;
|
||||
pub mod os;
|
||||
// pub mod os;
|
||||
pub mod path;
|
||||
pub mod wildcard;
|
||||
pub mod xml;
|
||||
|
||||
@@ -1,178 +0,0 @@
|
||||
use nix::sys::stat::{self, stat};
|
||||
use nix::sys::statfs::{self, FsType, statfs};
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufRead, Error, ErrorKind, Result};
|
||||
use std::path::Path;
|
||||
|
||||
use crate::disk::Info;
|
||||
|
||||
use super::IOStats;
|
||||
|
||||
/// returns total and free bytes available in a directory, e.g. `/`.
|
||||
pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<Info> {
|
||||
let stat_fs = statfs(p.as_ref())?;
|
||||
|
||||
let bsize = stat_fs.block_size() as u64;
|
||||
let bfree = stat_fs.blocks_free() as u64;
|
||||
let bavail = stat_fs.blocks_available() as u64;
|
||||
let blocks = stat_fs.blocks() as u64;
|
||||
|
||||
let reserved = match bfree.checked_sub(bavail) {
|
||||
Some(reserved) => reserved,
|
||||
None => {
|
||||
return Err(Error::other(format!(
|
||||
"detected f_bavail space ({}) > f_bfree space ({}), fs corruption at ({}). please run 'fsck'",
|
||||
bavail,
|
||||
bfree,
|
||||
p.as_ref().display()
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let total = match blocks.checked_sub(reserved) {
|
||||
Some(total) => total * bsize,
|
||||
None => {
|
||||
return Err(Error::other(format!(
|
||||
"detected reserved space ({}) > blocks space ({}), fs corruption at ({}). please run 'fsck'",
|
||||
reserved,
|
||||
blocks,
|
||||
p.as_ref().display()
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let free = bavail * bsize;
|
||||
let used = match total.checked_sub(free) {
|
||||
Some(used) => used,
|
||||
None => {
|
||||
return Err(Error::other(format!(
|
||||
"detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'",
|
||||
free,
|
||||
total,
|
||||
p.as_ref().display()
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let st = stat(p.as_ref())?;
|
||||
|
||||
Ok(Info {
|
||||
total,
|
||||
free,
|
||||
used,
|
||||
files: stat_fs.files(),
|
||||
ffree: stat_fs.files_free(),
|
||||
fstype: get_fs_type(stat_fs.filesystem_type()).to_string(),
|
||||
|
||||
major: stat::major(st.st_dev),
|
||||
minor: stat::minor(st.st_dev),
|
||||
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
/// returns the filesystem type of the underlying mounted filesystem
|
||||
///
|
||||
/// TODO The following mapping could not find the corresponding constant in `nix`:
|
||||
///
|
||||
/// "137d" => "EXT",
|
||||
/// "4244" => "HFS",
|
||||
/// "5346544e" => "NTFS",
|
||||
/// "61756673" => "AUFS",
|
||||
/// "ef51" => "EXT2OLD",
|
||||
/// "2fc12fc1" => "zfs",
|
||||
/// "ff534d42" => "cifs",
|
||||
/// "53464846" => "wslfs",
|
||||
fn get_fs_type(fs_type: FsType) -> &'static str {
|
||||
match fs_type {
|
||||
statfs::TMPFS_MAGIC => "TMPFS",
|
||||
statfs::MSDOS_SUPER_MAGIC => "MSDOS",
|
||||
// statfs::XFS_SUPER_MAGIC => "XFS",
|
||||
statfs::NFS_SUPER_MAGIC => "NFS",
|
||||
statfs::EXT4_SUPER_MAGIC => "EXT4",
|
||||
statfs::ECRYPTFS_SUPER_MAGIC => "ecryptfs",
|
||||
statfs::OVERLAYFS_SUPER_MAGIC => "overlayfs",
|
||||
statfs::REISERFS_SUPER_MAGIC => "REISERFS",
|
||||
|
||||
_ => "UNKNOWN",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn same_disk(disk1: &str, disk2: &str) -> Result<bool> {
|
||||
let stat1 = stat(disk1)?;
|
||||
let stat2 = stat(disk2)?;
|
||||
|
||||
Ok(stat1.st_dev == stat2.st_dev)
|
||||
}
|
||||
|
||||
pub fn get_drive_stats(major: u32, minor: u32) -> Result<IOStats> {
|
||||
read_drive_stats(&format!("/sys/dev/block/{}:{}/stat", major, minor))
|
||||
}
|
||||
|
||||
fn read_drive_stats(stats_file: &str) -> Result<IOStats> {
|
||||
let stats = read_stat(stats_file)?;
|
||||
if stats.len() < 11 {
|
||||
return Err(Error::other(format!("found invalid format while reading {}", stats_file)));
|
||||
}
|
||||
let mut io_stats = IOStats {
|
||||
read_ios: stats[0],
|
||||
read_merges: stats[1],
|
||||
read_sectors: stats[2],
|
||||
read_ticks: stats[3],
|
||||
write_ios: stats[4],
|
||||
write_merges: stats[5],
|
||||
write_sectors: stats[6],
|
||||
write_ticks: stats[7],
|
||||
current_ios: stats[8],
|
||||
total_ticks: stats[9],
|
||||
req_ticks: stats[10],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if stats.len() > 14 {
|
||||
io_stats.discard_ios = stats[11];
|
||||
io_stats.discard_merges = stats[12];
|
||||
io_stats.discard_sectors = stats[13];
|
||||
io_stats.discard_ticks = stats[14];
|
||||
}
|
||||
Ok(io_stats)
|
||||
}
|
||||
|
||||
fn read_stat(file_name: &str) -> Result<Vec<u64>> {
|
||||
// 打开文件
|
||||
let path = Path::new(file_name);
|
||||
let file = File::open(path)?;
|
||||
|
||||
// 创建一个 BufReader
|
||||
let reader = io::BufReader::new(file);
|
||||
|
||||
// 读取第一行
|
||||
let mut stats = Vec::new();
|
||||
if let Some(line) = reader.lines().next() {
|
||||
let line = line?;
|
||||
// 分割行并解析为 u64
|
||||
// https://rust-lang.github.io/rust-clippy/master/index.html#trim_split_whitespace
|
||||
for token in line.split_whitespace() {
|
||||
let ui64: u64 = token
|
||||
.parse()
|
||||
.map_err(|e| Error::new(ErrorKind::InvalidData, format!("Failed to parse token '{}': {}", token, e)))?;
|
||||
stats.push(ui64);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::get_drive_stats;
|
||||
|
||||
#[ignore] // FIXME: failed in github actions
|
||||
#[test]
|
||||
fn test_stats() {
|
||||
let major = 7;
|
||||
let minor = 11;
|
||||
let s = get_drive_stats(major, minor).unwrap();
|
||||
println!("{:?}", s);
|
||||
}
|
||||
}
|
||||
@@ -1,338 +0,0 @@
|
||||
#[cfg(target_os = "linux")]
|
||||
mod linux;
|
||||
#[cfg(all(unix, not(target_os = "linux")))]
|
||||
mod unix;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod windows;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub use linux::{get_drive_stats, get_info, same_disk};
|
||||
// pub use linux::same_disk;
|
||||
|
||||
#[cfg(all(unix, not(target_os = "linux")))]
|
||||
pub use unix::{get_drive_stats, get_info, same_disk};
|
||||
#[cfg(target_os = "windows")]
|
||||
pub use windows::{get_drive_stats, get_info, same_disk};
|
||||
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
pub struct IOStats {
|
||||
pub read_ios: u64,
|
||||
pub read_merges: u64,
|
||||
pub read_sectors: u64,
|
||||
pub read_ticks: u64,
|
||||
pub write_ios: u64,
|
||||
pub write_merges: u64,
|
||||
pub write_sectors: u64,
|
||||
pub write_ticks: u64,
|
||||
pub current_ios: u64,
|
||||
pub total_ticks: u64,
|
||||
pub req_ticks: u64,
|
||||
pub discard_ios: u64,
|
||||
pub discard_merges: u64,
|
||||
pub discard_sectors: u64,
|
||||
pub discard_ticks: u64,
|
||||
pub flush_ios: u64,
|
||||
pub flush_ticks: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
fn test_get_info_valid_path() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let info = get_info(temp_dir.path()).unwrap();
|
||||
|
||||
println!("Disk Info: {:?}", info);
|
||||
|
||||
assert!(info.total > 0);
|
||||
assert!(info.free > 0);
|
||||
assert!(info.used > 0);
|
||||
assert!(info.files > 0);
|
||||
assert!(info.ffree > 0);
|
||||
assert!(!info.fstype.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_info_invalid_path() {
|
||||
let invalid_path = PathBuf::from("/invalid/path");
|
||||
let result = get_info(&invalid_path);
|
||||
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_disk_same_path() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let path = temp_dir.path().to_str().unwrap();
|
||||
|
||||
let result = same_disk(path, path).unwrap();
|
||||
assert!(result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_disk_different_paths() {
|
||||
let temp_dir1 = tempfile::tempdir().unwrap();
|
||||
let temp_dir2 = tempfile::tempdir().unwrap();
|
||||
|
||||
let path1 = temp_dir1.path().to_str().unwrap();
|
||||
let path2 = temp_dir2.path().to_str().unwrap();
|
||||
|
||||
let result = same_disk(path1, path2).unwrap();
|
||||
// Note: On many systems, temporary directories are on the same disk
|
||||
// This test mainly verifies the function works without error
|
||||
// The actual result depends on the system configuration
|
||||
println!("Same disk result for temp dirs: {}", result);
|
||||
|
||||
// The function returns a boolean value as expected
|
||||
let _: bool = result; // Type assertion to verify return type
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_drive_stats_default() {
|
||||
let stats = get_drive_stats(0, 0).unwrap();
|
||||
assert_eq!(stats, IOStats::default());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iostats_default_values() {
|
||||
// Test that IOStats default values are all zero
|
||||
let stats = IOStats::default();
|
||||
|
||||
assert_eq!(stats.read_ios, 0);
|
||||
assert_eq!(stats.read_merges, 0);
|
||||
assert_eq!(stats.read_sectors, 0);
|
||||
assert_eq!(stats.read_ticks, 0);
|
||||
assert_eq!(stats.write_ios, 0);
|
||||
assert_eq!(stats.write_merges, 0);
|
||||
assert_eq!(stats.write_sectors, 0);
|
||||
assert_eq!(stats.write_ticks, 0);
|
||||
assert_eq!(stats.current_ios, 0);
|
||||
assert_eq!(stats.total_ticks, 0);
|
||||
assert_eq!(stats.req_ticks, 0);
|
||||
assert_eq!(stats.discard_ios, 0);
|
||||
assert_eq!(stats.discard_merges, 0);
|
||||
assert_eq!(stats.discard_sectors, 0);
|
||||
assert_eq!(stats.discard_ticks, 0);
|
||||
assert_eq!(stats.flush_ios, 0);
|
||||
assert_eq!(stats.flush_ticks, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iostats_equality() {
|
||||
// Test IOStats equality comparison
|
||||
let stats1 = IOStats::default();
|
||||
let stats2 = IOStats::default();
|
||||
assert_eq!(stats1, stats2);
|
||||
|
||||
let stats3 = IOStats {
|
||||
read_ios: 100,
|
||||
write_ios: 50,
|
||||
..Default::default()
|
||||
};
|
||||
let stats4 = IOStats {
|
||||
read_ios: 100,
|
||||
write_ios: 50,
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(stats3, stats4);
|
||||
|
||||
// Test inequality
|
||||
assert_ne!(stats1, stats3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iostats_debug_format() {
|
||||
// Test Debug trait implementation
|
||||
let stats = IOStats {
|
||||
read_ios: 123,
|
||||
write_ios: 456,
|
||||
total_ticks: 789,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let debug_str = format!("{:?}", stats);
|
||||
assert!(debug_str.contains("read_ios: 123"));
|
||||
assert!(debug_str.contains("write_ios: 456"));
|
||||
assert!(debug_str.contains("total_ticks: 789"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iostats_partial_eq() {
|
||||
// Test PartialEq trait implementation with various field combinations
|
||||
let base_stats = IOStats {
|
||||
read_ios: 10,
|
||||
write_ios: 20,
|
||||
read_sectors: 100,
|
||||
write_sectors: 200,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let same_stats = IOStats {
|
||||
read_ios: 10,
|
||||
write_ios: 20,
|
||||
read_sectors: 100,
|
||||
write_sectors: 200,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let different_read = IOStats {
|
||||
read_ios: 11, // Different
|
||||
write_ios: 20,
|
||||
read_sectors: 100,
|
||||
write_sectors: 200,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(base_stats, same_stats);
|
||||
assert_ne!(base_stats, different_read);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_info_path_edge_cases() {
|
||||
// Test with root directory (should work on most systems)
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let result = get_info(std::path::Path::new("/"));
|
||||
assert!(result.is_ok(), "Root directory should be accessible");
|
||||
|
||||
if let Ok(info) = result {
|
||||
assert!(info.total > 0, "Root filesystem should have non-zero total space");
|
||||
assert!(!info.fstype.is_empty(), "Root filesystem should have a type");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
{
|
||||
let result = get_info(std::path::Path::new("C:\\"));
|
||||
// On Windows, C:\ might not always exist, so we don't assert success
|
||||
if let Ok(info) = result {
|
||||
assert!(info.total > 0);
|
||||
assert!(!info.fstype.is_empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_info_nonexistent_path() {
|
||||
// Test with various types of invalid paths
|
||||
let invalid_paths = [
|
||||
"/this/path/definitely/does/not/exist/anywhere",
|
||||
"/dev/null/invalid", // /dev/null is a file, not a directory
|
||||
"", // Empty path
|
||||
];
|
||||
|
||||
for invalid_path in &invalid_paths {
|
||||
let result = get_info(std::path::Path::new(invalid_path));
|
||||
assert!(result.is_err(), "Invalid path should return error: {}", invalid_path);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_disk_edge_cases() {
|
||||
// Test with same path (should always be true)
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let path_str = temp_dir.path().to_str().unwrap();
|
||||
|
||||
let result = same_disk(path_str, path_str);
|
||||
assert!(result.is_ok());
|
||||
assert!(result.unwrap(), "Same path should be on same disk");
|
||||
|
||||
// Test with parent and child directories (should be on same disk)
|
||||
let child_dir = temp_dir.path().join("child");
|
||||
std::fs::create_dir(&child_dir).unwrap();
|
||||
let child_path = child_dir.to_str().unwrap();
|
||||
|
||||
let result = same_disk(path_str, child_path);
|
||||
assert!(result.is_ok());
|
||||
assert!(result.unwrap(), "Parent and child should be on same disk");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_disk_invalid_paths() {
|
||||
// Test with invalid paths
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let valid_path = temp_dir.path().to_str().unwrap();
|
||||
let invalid_path = "/this/path/does/not/exist";
|
||||
|
||||
let result1 = same_disk(valid_path, invalid_path);
|
||||
assert!(result1.is_err(), "Should fail with one invalid path");
|
||||
|
||||
let result2 = same_disk(invalid_path, valid_path);
|
||||
assert!(result2.is_err(), "Should fail with one invalid path");
|
||||
|
||||
let result3 = same_disk(invalid_path, invalid_path);
|
||||
assert!(result3.is_err(), "Should fail with both invalid paths");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iostats_field_ranges() {
|
||||
// Test that IOStats can handle large values
|
||||
let large_stats = IOStats {
|
||||
read_ios: u64::MAX,
|
||||
write_ios: u64::MAX,
|
||||
read_sectors: u64::MAX,
|
||||
write_sectors: u64::MAX,
|
||||
total_ticks: u64::MAX,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Should be able to create and compare
|
||||
let another_large = IOStats {
|
||||
read_ios: u64::MAX,
|
||||
write_ios: u64::MAX,
|
||||
read_sectors: u64::MAX,
|
||||
write_sectors: u64::MAX,
|
||||
total_ticks: u64::MAX,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(large_stats, another_large);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_drive_stats_error_handling() {
|
||||
// Test with potentially invalid major/minor numbers
|
||||
// Note: This might succeed on some systems, so we just ensure it doesn't panic
|
||||
let result1 = get_drive_stats(999, 999);
|
||||
// Don't assert success/failure as it's platform-dependent
|
||||
let _ = result1;
|
||||
|
||||
let result2 = get_drive_stats(u32::MAX, u32::MAX);
|
||||
let _ = result2;
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn test_unix_specific_paths() {
|
||||
// Test Unix-specific paths
|
||||
let unix_paths = ["/tmp", "/var", "/usr"];
|
||||
|
||||
for path in &unix_paths {
|
||||
if std::path::Path::new(path).exists() {
|
||||
let result = get_info(std::path::Path::new(path));
|
||||
if result.is_ok() {
|
||||
let info = result.unwrap();
|
||||
assert!(info.total > 0, "Path {} should have non-zero total space", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iostats_clone_and_copy() {
|
||||
// Test that IOStats implements Clone (if it does)
|
||||
let original = IOStats {
|
||||
read_ios: 42,
|
||||
write_ios: 84,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Test Debug formatting with non-default values
|
||||
let debug_output = format!("{:?}", original);
|
||||
assert!(debug_output.contains("42"));
|
||||
assert!(debug_output.contains("84"));
|
||||
}
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
use super::IOStats;
|
||||
use crate::disk::Info;
|
||||
use nix::sys::{stat::stat, statfs::statfs};
|
||||
use std::io::{Error, Result};
|
||||
use std::path::Path;
|
||||
|
||||
/// returns total and free bytes available in a directory, e.g. `/`.
|
||||
pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<Info> {
|
||||
let stat = statfs(p.as_ref())?;
|
||||
|
||||
let bsize = stat.block_size() as u64;
|
||||
let bfree = stat.blocks_free() as u64;
|
||||
let bavail = stat.blocks_available() as u64;
|
||||
let blocks = stat.blocks() as u64;
|
||||
|
||||
let reserved = match bfree.checked_sub(bavail) {
|
||||
Some(reserved) => reserved,
|
||||
None => {
|
||||
return Err(Error::other(format!(
|
||||
"detected f_bavail space ({}) > f_bfree space ({}), fs corruption at ({}). please run fsck",
|
||||
bavail,
|
||||
bfree,
|
||||
p.as_ref().display()
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let total = match blocks.checked_sub(reserved) {
|
||||
Some(total) => total * bsize,
|
||||
None => {
|
||||
return Err(Error::other(format!(
|
||||
"detected reserved space ({}) > blocks space ({}), fs corruption at ({}). please run fsck",
|
||||
reserved,
|
||||
blocks,
|
||||
p.as_ref().display()
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let free = bavail * bsize;
|
||||
let used = match total.checked_sub(free) {
|
||||
Some(used) => used,
|
||||
None => {
|
||||
return Err(Error::other(format!(
|
||||
"detected free space ({}) > total drive space ({}), fs corruption at ({}). please run fsck",
|
||||
free,
|
||||
total,
|
||||
p.as_ref().display()
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Info {
|
||||
total,
|
||||
free,
|
||||
used,
|
||||
files: stat.files(),
|
||||
ffree: stat.files_free(),
|
||||
fstype: stat.filesystem_type_name().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn same_disk(disk1: &str, disk2: &str) -> Result<bool> {
|
||||
let stat1 = stat(disk1)?;
|
||||
let stat2 = stat(disk2)?;
|
||||
|
||||
Ok(stat1.st_dev == stat2.st_dev)
|
||||
}
|
||||
|
||||
pub fn get_drive_stats(_major: u32, _minor: u32) -> Result<IOStats> {
|
||||
Ok(IOStats::default())
|
||||
}
|
||||
@@ -1,144 +0,0 @@
|
||||
#![allow(unsafe_code)] // TODO: audit unsafe code
|
||||
|
||||
use super::IOStats;
|
||||
use crate::disk::Info;
|
||||
use std::io::{Error, ErrorKind, Result};
|
||||
use std::mem;
|
||||
use std::os::windows::ffi::OsStrExt;
|
||||
use std::path::Path;
|
||||
use winapi::shared::minwindef::{DWORD, MAX_PATH};
|
||||
use winapi::shared::ntdef::ULARGE_INTEGER;
|
||||
use winapi::um::fileapi::{GetDiskFreeSpaceExW, GetDiskFreeSpaceW, GetVolumeInformationW, GetVolumePathNameW};
|
||||
use winapi::um::winnt::{LPCWSTR, WCHAR};
|
||||
|
||||
/// returns total and free bytes available in a directory, e.g. `C:\`.
|
||||
pub fn get_info(p: impl AsRef<Path>) -> Result<Info> {
|
||||
let path_wide: Vec<WCHAR> = p
|
||||
.as_ref()
|
||||
.canonicalize()?
|
||||
.into_os_string()
|
||||
.encode_wide()
|
||||
.chain(std::iter::once(0)) // Null-terminate the string
|
||||
.collect();
|
||||
|
||||
let mut lp_free_bytes_available: ULARGE_INTEGER = unsafe { mem::zeroed() };
|
||||
let mut lp_total_number_of_bytes: ULARGE_INTEGER = unsafe { mem::zeroed() };
|
||||
let mut lp_total_number_of_free_bytes: ULARGE_INTEGER = unsafe { mem::zeroed() };
|
||||
|
||||
let success = unsafe {
|
||||
GetDiskFreeSpaceExW(
|
||||
path_wide.as_ptr(),
|
||||
&mut lp_free_bytes_available,
|
||||
&mut lp_total_number_of_bytes,
|
||||
&mut lp_total_number_of_free_bytes,
|
||||
)
|
||||
};
|
||||
if success == 0 {
|
||||
return Err(Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let total = unsafe { *lp_total_number_of_bytes.QuadPart() };
|
||||
let free = unsafe { *lp_total_number_of_free_bytes.QuadPart() };
|
||||
|
||||
if free > total {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!(
|
||||
"detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'",
|
||||
free,
|
||||
total,
|
||||
p.as_ref().display()
|
||||
),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut lp_sectors_per_cluster: DWORD = 0;
|
||||
let mut lp_bytes_per_sector: DWORD = 0;
|
||||
let mut lp_number_of_free_clusters: DWORD = 0;
|
||||
let mut lp_total_number_of_clusters: DWORD = 0;
|
||||
|
||||
let success = unsafe {
|
||||
GetDiskFreeSpaceW(
|
||||
path_wide.as_ptr(),
|
||||
&mut lp_sectors_per_cluster,
|
||||
&mut lp_bytes_per_sector,
|
||||
&mut lp_number_of_free_clusters,
|
||||
&mut lp_total_number_of_clusters,
|
||||
)
|
||||
};
|
||||
if success == 0 {
|
||||
return Err(Error::last_os_error().into());
|
||||
}
|
||||
|
||||
Ok(Info {
|
||||
total,
|
||||
free,
|
||||
used: total - free,
|
||||
files: lp_total_number_of_clusters as u64,
|
||||
ffree: lp_number_of_free_clusters as u64,
|
||||
fstype: get_fs_type(&path_wide)?,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
/// returns leading volume name.
|
||||
fn get_volume_name(v: &[WCHAR]) -> Result<LPCWSTR> {
|
||||
let volume_name_size: DWORD = MAX_PATH as _;
|
||||
let mut lp_volume_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
|
||||
|
||||
let success = unsafe { GetVolumePathNameW(v.as_ptr(), lp_volume_name_buffer.as_mut_ptr(), volume_name_size) };
|
||||
|
||||
if success == 0 {
|
||||
return Err(Error::last_os_error().into());
|
||||
}
|
||||
|
||||
Ok(lp_volume_name_buffer.as_ptr())
|
||||
}
|
||||
|
||||
fn utf16_to_string(v: &[WCHAR]) -> String {
|
||||
let len = v.iter().position(|&x| x == 0).unwrap_or(v.len());
|
||||
String::from_utf16_lossy(&v[..len])
|
||||
}
|
||||
|
||||
/// returns the filesystem type of the underlying mounted filesystem
|
||||
fn get_fs_type(p: &[WCHAR]) -> Result<String> {
|
||||
let path = get_volume_name(p)?;
|
||||
|
||||
let volume_name_size: DWORD = MAX_PATH as _;
|
||||
let n_file_system_name_size: DWORD = MAX_PATH as _;
|
||||
|
||||
let mut lp_volume_serial_number: DWORD = 0;
|
||||
let mut lp_maximum_component_length: DWORD = 0;
|
||||
let mut lp_file_system_flags: DWORD = 0;
|
||||
|
||||
let mut lp_volume_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
|
||||
let mut lp_file_system_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
|
||||
|
||||
let success = unsafe {
|
||||
GetVolumeInformationW(
|
||||
path,
|
||||
lp_volume_name_buffer.as_mut_ptr(),
|
||||
volume_name_size,
|
||||
&mut lp_volume_serial_number,
|
||||
&mut lp_maximum_component_length,
|
||||
&mut lp_file_system_flags,
|
||||
lp_file_system_name_buffer.as_mut_ptr(),
|
||||
n_file_system_name_size,
|
||||
)
|
||||
};
|
||||
|
||||
if success == 0 {
|
||||
return Err(Error::last_os_error().into());
|
||||
}
|
||||
|
||||
Ok(utf16_to_string(&lp_file_system_name_buffer))
|
||||
}
|
||||
|
||||
pub fn same_disk(_add_extensiondisk1: &str, _disk2: &str) -> Result<bool> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub fn get_drive_stats(_major: u32, _minor: u32) -> Result<IOStats> {
|
||||
Ok(IOStats::default())
|
||||
}
|
||||
Reference in New Issue
Block a user