update ec share size

update bitrot
This commit is contained in:
weisd
2025-06-10 11:17:53 +08:00
parent 6ea0185519
commit 754ffd0ff2
21 changed files with 1903 additions and 1618 deletions

View File

@@ -93,6 +93,10 @@ pub struct ErasureInfo {
pub checksums: Vec<ChecksumInfo>,
}
pub fn calc_shard_size(block_size: usize, data_shards: usize) -> usize {
(block_size.div_ceil(data_shards) + 1) & !1
}
impl ErasureInfo {
pub fn get_checksum_info(&self, part_number: usize) -> ChecksumInfo {
for sum in &self.checksums {
@@ -109,7 +113,7 @@ impl ErasureInfo {
/// Calculate the size of each shard.
pub fn shard_size(&self) -> usize {
self.block_size.div_ceil(self.data_blocks)
calc_shard_size(self.block_size, self.data_blocks)
}
/// Calculate the total erasure file size for a given original size.
// Returns the final erasure size from the original size
@@ -120,7 +124,7 @@ impl ErasureInfo {
let num_shards = total_length / self.block_size;
let last_block_size = total_length % self.block_size;
let last_shard_size = last_block_size.div_ceil(self.data_blocks);
let last_shard_size = calc_shard_size(last_block_size, self.data_blocks);
num_shards * self.shard_size() + last_shard_size
}

View File

@@ -1,13 +1,12 @@
use crate::{Reader, Writer};
use pin_project_lite::pin_project;
use rustfs_utils::{HashAlgorithm, read_full, write_all};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
pin_project! {
/// BitrotReader reads (hash+data) blocks from an async reader and verifies hash integrity.
pub struct BitrotReader {
pub struct BitrotReader<R> {
#[pin]
inner: Box<dyn Reader>,
inner: R,
hash_algo: HashAlgorithm,
shard_size: usize,
buf: Vec<u8>,
@@ -19,14 +18,12 @@ pin_project! {
}
}
impl BitrotReader {
/// Get a reference to the underlying reader.
pub fn get_ref(&self) -> &dyn Reader {
&*self.inner
}
impl<R> BitrotReader<R>
where
R: AsyncRead + Unpin + Send + Sync,
{
/// Create a new BitrotReader.
pub fn new(inner: Box<dyn Reader>, shard_size: usize, algo: HashAlgorithm) -> Self {
pub fn new(inner: R, shard_size: usize, algo: HashAlgorithm) -> Self {
let hash_size = algo.size();
Self {
inner,
@@ -86,9 +83,9 @@ impl BitrotReader {
pin_project! {
/// BitrotWriter writes (hash+data) blocks to an async writer.
pub struct BitrotWriter {
pub struct BitrotWriter<W> {
#[pin]
inner: Writer,
inner: W,
hash_algo: HashAlgorithm,
shard_size: usize,
buf: Vec<u8>,
@@ -96,9 +93,12 @@ pin_project! {
}
}
impl BitrotWriter {
impl<W> BitrotWriter<W>
where
W: AsyncWrite + Unpin + Send + Sync,
{
/// Create a new BitrotWriter.
pub fn new(inner: Writer, shard_size: usize, algo: HashAlgorithm) -> Self {
pub fn new(inner: W, shard_size: usize, algo: HashAlgorithm) -> Self {
let hash_algo = algo;
Self {
inner,
@@ -109,7 +109,7 @@ impl BitrotWriter {
}
}
pub fn into_inner(self) -> Writer {
pub fn into_inner(self) -> W {
self.inner
}
@@ -209,7 +209,7 @@ pub async fn bitrot_verify<R: AsyncRead + Unpin + Send>(
#[cfg(test)]
mod tests {
use crate::{BitrotReader, BitrotWriter, Writer};
use crate::{BitrotReader, BitrotWriter};
use rustfs_utils::HashAlgorithm;
use std::io::Cursor;
@@ -219,9 +219,9 @@ mod tests {
let data_size = data.len();
let shard_size = 8;
let buf = Vec::new();
let buf: Vec<u8> = Vec::new();
let writer = Cursor::new(buf);
let mut bitrot_writer = BitrotWriter::new(Writer::from_cursor(writer), shard_size, HashAlgorithm::HighwayHash256);
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::HighwayHash256);
let mut n = 0;
for chunk in data.chunks(shard_size) {
@@ -230,8 +230,7 @@ mod tests {
assert_eq!(n, data.len());
// 读
let reader = Cursor::new(bitrot_writer.into_inner().into_cursor_inner().unwrap());
let reader = Box::new(reader);
let reader = bitrot_writer.into_inner();
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
let mut out = Vec::new();
let mut n = 0;
@@ -253,18 +252,17 @@ mod tests {
let data = b"test data for bitrot";
let data_size = data.len();
let shard_size = 8;
let buf = Vec::new();
let buf: Vec<u8> = Vec::new();
let writer = Cursor::new(buf);
let mut bitrot_writer = BitrotWriter::new(Writer::from_cursor(writer), shard_size, HashAlgorithm::HighwayHash256);
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::HighwayHash256);
for chunk in data.chunks(shard_size) {
let _ = bitrot_writer.write(chunk).await.unwrap();
}
let mut written = bitrot_writer.into_inner().into_cursor_inner().unwrap();
let mut written = bitrot_writer.into_inner().into_inner();
// change the last byte to make hash mismatch
let pos = written.len() - 1;
written[pos] ^= 0xFF;
let reader = Cursor::new(written);
let reader = Box::new(reader);
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
let count = data_size.div_ceil(shard_size);
@@ -297,9 +295,9 @@ mod tests {
let data_size = data.len();
let shard_size = 8;
let buf = Vec::new();
let buf: Vec<u8> = Vec::new();
let writer = Cursor::new(buf);
let mut bitrot_writer = BitrotWriter::new(Writer::from_cursor(writer), shard_size, HashAlgorithm::None);
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::None);
let mut n = 0;
for chunk in data.chunks(shard_size) {
@@ -307,8 +305,7 @@ mod tests {
}
assert_eq!(n, data.len());
let reader = Cursor::new(bitrot_writer.into_inner().into_cursor_inner().unwrap());
let reader = Box::new(reader);
let reader = bitrot_writer.into_inner();
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::None);
let mut out = Vec::new();
let mut n = 0;

View File

@@ -30,9 +30,6 @@ pub use writer::*;
mod http_reader;
pub use http_reader::*;
mod bitrot;
pub use bitrot::*;
mod etag;
pub trait Reader: tokio::io::AsyncRead + Unpin + Send + Sync + EtagResolvable + HashReaderDetector {}

View File

@@ -1,308 +1,270 @@
# Reed-Solomon 纠删码性能基准测试
# Reed-Solomon Erasure Coding Performance Benchmark
本目录包含了比较不同 Reed-Solomon 实现性能的综合基准测试套件。
This directory contains a comprehensive benchmark suite for comparing the performance of different Reed-Solomon implementations.
## 📊 测试概述
## 📊 Test Overview
### 支持的实现模式
### Supported Implementation Modes
#### 🏛️ Erasure 模式(默认,推荐)
- **稳定可靠**: 使用成熟的 reed-solomon-erasure 实现
- **广泛兼容**: 支持任意分片大小
- **内存高效**: 优化的内存使用模式
- **可预测性**: 性能对分片大小不敏感
- **使用场景**: 生产环境默认选择,适合大多数应用场景
#### 🏛️ Pure Erasure Mode (Default, Recommended)
- **Stable and Reliable**: Uses mature reed-solomon-erasure implementation
- **Wide Compatibility**: Supports arbitrary shard sizes
- **Memory Efficient**: Optimized memory usage patterns
- **Predictable**: Performance insensitive to shard size
- **Use Case**: Default choice for production environments, suitable for most application scenarios
#### 🎯 混合模式(`reed-solomon-simd` feature
- **自动优化**: 根据分片大小智能选择最优实现
- **SIMD + Erasure Fallback**: 大分片使用 SIMD 优化,小分片或 SIMD 失败时自动回退到 Erasure 实现
- **兼容性**: 支持所有分片大小和配置
- **性能**: 在各种场景下都能提供最佳性能
- **使用场景**: 需要最大化性能的场景,适合处理大量数据
#### 🎯 SIMD Mode (`reed-solomon-simd` feature)
- **High Performance Optimization**: Uses SIMD instruction sets for high-performance encoding/decoding
- **Performance Oriented**: Focuses on maximizing processing performance
- **Target Scenarios**: High-performance scenarios for large data processing
- **Use Case**: Scenarios requiring maximum performance, suitable for handling large amounts of data
**回退机制**:
- ✅ 分片 ≥ 512 字节:优先使用 SIMD 优化
- 🔄 分片 < 512 字节或 SIMD 失败:自动回退到 Erasure 实现
- 📊 无缝切换,透明给用户
### Test Dimensions
### 测试维度
- **Encoding Performance** - Speed of encoding data into erasure code shards
- **Decoding Performance** - Speed of recovering original data from erasure code shards
- **Shard Size Sensitivity** - Impact of different shard sizes on performance
- **Erasure Code Configuration** - Performance impact of different data/parity shard ratios
- **SIMD Mode Performance** - Performance characteristics of SIMD optimization
- **Concurrency Performance** - Performance in multi-threaded environments
- **Memory Efficiency** - Memory usage patterns and efficiency
- **Error Recovery Capability** - Recovery performance under different numbers of lost shards
- **编码性能** - 数据编码成纠删码分片的速度
- **解码性能** - 从纠删码分片恢复原始数据的速度
- **分片大小敏感性** - 不同分片大小对性能的影响
- **纠删码配置** - 不同数据/奇偶分片比例的性能影响
- **混合模式回退** - SIMD 与 Erasure 回退机制的性能
- **并发性能** - 多线程环境下的性能表现
- **内存效率** - 内存使用模式和效率
- **错误恢复能力** - 不同丢失分片数量下的恢复性能
## 🚀 Quick Start
## 🚀 快速开始
### 运行快速测试
### Run Quick Tests
```bash
# 运行快速性能对比测试(默认混合模式)
# Run quick performance comparison tests (default pure Erasure mode)
./run_benchmarks.sh quick
```
### 运行完整对比测试
### Run Complete Comparison Tests
```bash
# 运行详细的实现对比测试
# Run detailed implementation comparison tests
./run_benchmarks.sh comparison
```
### 运行特定模式的测试
### Run Specific Mode Tests
```bash
# 测试默认纯 erasure 模式(推荐)
# Test default pure erasure mode (recommended)
./run_benchmarks.sh erasure
# 测试混合模式SIMD + Erasure fallback
./run_benchmarks.sh hybrid
# Test SIMD mode
./run_benchmarks.sh simd
```
## 📈 手动运行基准测试
## 📈 Manual Benchmark Execution
### 基本使用
### Basic Usage
```bash
# 运行所有基准测试(默认纯 erasure 模式)
# Run all benchmarks (default pure erasure mode)
cargo bench
# 运行特定的基准测试文件
# Run specific benchmark files
cargo bench --bench erasure_benchmark
cargo bench --bench comparison_benchmark
```
### 对比不同实现模式
### Compare Different Implementation Modes
```bash
# 测试默认纯 erasure 模式
# Test default pure erasure mode
cargo bench --bench comparison_benchmark
# 测试混合模式SIMD + Erasure fallback
# Test SIMD mode
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd
# 保存基线进行对比
# Save baseline for comparison
cargo bench --bench comparison_benchmark \
-- --save-baseline erasure_baseline
# 与基线比较混合模式性能
# Compare SIMD mode performance with baseline
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --baseline erasure_baseline
```
### 过滤特定测试
### Filter Specific Tests
```bash
# 只运行编码测试
# Run only encoding tests
cargo bench encode
# 只运行解码测试
# Run only decoding tests
cargo bench decode
# 只运行特定数据大小的测试
# Run tests for specific data sizes
cargo bench 1MB
# 只运行特定配置的测试
# Run tests for specific configurations
cargo bench "4+2"
```
## 📊 查看结果
## 📊 View Results
### HTML 报告
### HTML Reports
基准测试结果会自动生成 HTML 报告:
Benchmark results automatically generate HTML reports:
```bash
# 启动本地服务器查看报告
# Start local server to view reports
cd target/criterion
python3 -m http.server 8080
# 在浏览器中访问
# Access in browser
open http://localhost:8080/report/index.html
```
### 命令行输出
### Command Line Output
基准测试会在终端显示:
- 每秒操作数 (ops/sec)
- 吞吐量 (MB/s)
- 延迟统计 (平均值、标准差、百分位数)
- 性能变化趋势
- 回退机制触发情况
Benchmarks display in terminal:
- Operations per second (ops/sec)
- Throughput (MB/s)
- Latency statistics (mean, standard deviation, percentiles)
- Performance trend changes
## 🔧 测试配置
## 🔧 Test Configuration
### 数据大小
### Data Sizes
- **小数据**: 1KB, 8KB - 测试小文件场景和回退机制
- **中等数据**: 64KB, 256KB - 测试常见文件大小
- **大数据**: 1MB, 4MB - 测试大文件处理和 SIMD 优化
- **超大数据**: 16MB+ - 测试高吞吐量场景
- **Small Data**: 1KB, 8KB - Test small file scenarios
- **Medium Data**: 64KB, 256KB - Test common file sizes
- **Large Data**: 1MB, 4MB - Test large file processing and SIMD optimization
- **Very Large Data**: 16MB+ - Test high throughput scenarios
### 纠删码配置
### Erasure Code Configurations
- **(4,2)** - 常用配置33% 冗余
- **(6,3)** - 50% 冗余,平衡性能和可靠性
- **(8,4)** - 50% 冗余,更多并行度
- **(10,5)**, **(12,6)** - 高并行度配置
- **(4,2)** - Common configuration, 33% redundancy
- **(6,3)** - 50% redundancy, balanced performance and reliability
- **(8,4)** - 50% redundancy, more parallelism
- **(10,5)**, **(12,6)** - High parallelism configurations
### 分片大小
### Shard Sizes
测试从 32 字节到 8KB 的不同分片大小,特别关注:
- **回退临界点**: 512 字节 - 混合模式的 SIMD/Erasure 切换点
- **内存对齐**: 64, 128, 256 字节 - 内存对齐对性能的影响
- **Cache 友好**: 1KB, 2KB, 4KB - CPU 缓存友好的大小
Test different shard sizes from 32 bytes to 8KB, with special focus on:
- **Memory Alignment**: 64, 128, 256 bytes - Impact of memory alignment on performance
- **Cache Friendly**: 1KB, 2KB, 4KB - CPU cache-friendly sizes
## 📝 解读测试结果
## 📝 Interpreting Test Results
### 性能指标
### Performance Metrics
1. **吞吐量 (Throughput)**
- 单位: MB/s GB/s
- 衡量数据处理速度
- 越高越好
1. **Throughput**
- Unit: MB/s or GB/s
- Measures data processing speed
- Higher is better
2. **延迟 (Latency)**
- 单位: 微秒 (μs) 或毫秒 (ms)
- 衡量单次操作时间
- 越低越好
2. **Latency**
- Unit: microseconds (μs) or milliseconds (ms)
- Measures single operation time
- Lower is better
3. **CPU 效率**
- 每 CPU 周期处理的字节数
- 反映算法效率
3. **CPU Efficiency**
- Bytes processed per CPU cycle
- Reflects algorithm efficiency
4. **回退频率**
- 混合模式下 SIMD 到 Erasure 的回退次数
- 反映智能选择的效果
### Expected Results
### 预期结果
**Pure Erasure Mode (Default)**:
- Stable performance, insensitive to shard size
- Best compatibility, supports all configurations
- Stable and predictable memory usage
**纯 Erasure 模式(默认)**:
- 性能稳定,对分片大小不敏感
- 兼容性最佳,支持所有配置
- 内存使用稳定可预测
**SIMD Mode (`reed-solomon-simd` feature)**:
- High-performance SIMD optimized implementation
- Suitable for large data processing scenarios
- Focuses on maximizing performance
**混合模式(`reed-solomon-simd` feature**:
- 大分片 (≥512B):接近纯 SIMD 性能
- 小分片 (<512B):自动回退到 Erasure保证兼容性
- 整体:在各种场景下都有良好表现
**Shard Size Sensitivity**:
- SIMD mode may be more sensitive to shard sizes
- Pure Erasure mode relatively insensitive to shard size
**分片大小敏感性**:
- 混合模式在 512B 附近可能有性能切换
- Erasure 模式对分片大小相对不敏感
**Memory Usage**:
- SIMD mode may have specific memory alignment requirements
- Pure Erasure mode has more stable memory usage
**内存使用**:
- 混合模式根据场景优化内存使用
- 纯 Erasure 模式内存使用更稳定
## 🛠️ Custom Testing
## 🛠️ 自定义测试
### Adding New Test Scenarios
### 添加新的测试场景
编辑 `benches/erasure_benchmark.rs``benches/comparison_benchmark.rs`
Edit `benches/erasure_benchmark.rs` or `benches/comparison_benchmark.rs`:
```rust
// 添加新的测试配置
// Add new test configuration
let configs = vec![
// 你的自定义配置
// Your custom configuration
BenchConfig::new(10, 4, 2048 * 1024, 2048 * 1024), // 10+4, 2MB
];
```
### 调整测试参数
### Adjust Test Parameters
```rust
// 修改采样和测试时间
group.sample_size(20); // 样本数量
group.measurement_time(Duration::from_secs(10)); // 测试时间
// Modify sampling and test time
group.sample_size(20); // Sample count
group.measurement_time(Duration::from_secs(10)); // Test duration
```
### 测试回退机制
## 🐛 Troubleshooting
```rust
// 测试混合模式的回退行为
#[cfg(not(feature = "reed-solomon-erasure"))]
{
// 测试小分片是否正确回退
let small_data = vec![0u8; 256]; // 小于 512B应该使用 Erasure
let erasure = Erasure::new(4, 2, 256);
let result = erasure.encode_data(&small_data);
assert!(result.is_ok()); // 应该成功回退
}
```
### Common Issues
## 🐛 故障排除
### 常见问题
1. **编译错误**: 确保安装了正确的依赖
1. **Compilation Errors**: Ensure correct dependencies are installed
```bash
cargo update
cargo build --all-features
```
2. **性能异常**: 检查是否在正确的模式下运行
2. **Performance Anomalies**: Check if running in correct mode
```bash
# 检查当前配置
# Check current configuration
cargo bench --bench comparison_benchmark -- --help
```
3. **回退过于频繁**: 调整 SIMD 临界点
```rust
// 在代码中可以调整这个值
const SIMD_MIN_SHARD_SIZE: usize = 512;
```
4. **测试时间过长**: 调整测试参数
3. **Tests Taking Too Long**: Adjust test parameters
```bash
# 使用更短的测试时间
# Use shorter test duration
cargo bench -- --quick
```
### 性能分析
### Performance Analysis
使用 `perf` 等工具进行更详细的性能分析:
Use tools like `perf` for detailed performance analysis:
```bash
# 分析 CPU 使用情况
# Analyze CPU usage
cargo bench --bench comparison_benchmark &
perf record -p $(pgrep -f comparison_benchmark)
perf report
```
### 调试回退机制
## 🤝 Contributing
```bash
# 启用详细日志查看回退情况
RUST_LOG=warn cargo bench --bench comparison_benchmark
```
Welcome to submit new benchmark scenarios or optimization suggestions:
## 🤝 贡献
1. Fork the project
2. Create feature branch: `git checkout -b feature/new-benchmark`
3. Add test cases
4. Commit changes: `git commit -m 'Add new benchmark for XYZ'`
5. Push to branch: `git push origin feature/new-benchmark`
6. Create Pull Request
欢迎提交新的基准测试场景或优化建议:
1. Fork 项目
2. 创建特性分支: `git checkout -b feature/new-benchmark`
3. 添加测试用例
4. 提交更改: `git commit -m 'Add new benchmark for XYZ'`
5. 推送到分支: `git push origin feature/new-benchmark`
6. 创建 Pull Request
## 📚 参考资料
## 📚 References
- [reed-solomon-erasure crate](https://crates.io/crates/reed-solomon-erasure)
- [reed-solomon-simd crate](https://crates.io/crates/reed-solomon-simd)
- [Criterion.rs 基准测试框架](https://bheisler.github.io/criterion.rs/book/)
- [Reed-Solomon 纠删码原理](https://en.wikipedia.org/wiki/Reed%E2%80%93Solomon_error_correction)
- [Criterion.rs benchmark framework](https://bheisler.github.io/criterion.rs/book/)
- [Reed-Solomon error correction principles](https://en.wikipedia.org/wiki/Reed%E2%80%93Solomon_error_correction)
---
💡 **提示**:
- 推荐使用默认的混合模式,它能在各种场景下自动选择最优实现
- 基准测试结果可能因硬件、操作系统和编译器版本而异
- 建议在目标部署环境中运行测试以获得最准确的性能数据
💡 **Tips**:
- Recommend using the default pure Erasure mode, which provides stable performance across various scenarios
- Consider SIMD mode for high-performance requirements
- Benchmark results may vary based on hardware, operating system, and compiler versions
- Suggest running tests in target deployment environment for most accurate performance data

270
ecstore/BENCHMARK_ZH.md Normal file
View File

@@ -0,0 +1,270 @@
# Reed-Solomon 纠删码性能基准测试
本目录包含了比较不同 Reed-Solomon 实现性能的综合基准测试套件。
## 📊 测试概述
### 支持的实现模式
#### 🏛️ 纯 Erasure 模式(默认,推荐)
- **稳定可靠**: 使用成熟的 reed-solomon-erasure 实现
- **广泛兼容**: 支持任意分片大小
- **内存高效**: 优化的内存使用模式
- **可预测性**: 性能对分片大小不敏感
- **使用场景**: 生产环境默认选择,适合大多数应用场景
#### 🎯 SIMD模式`reed-solomon-simd` feature
- **高性能优化**: 使用SIMD指令集进行高性能编码解码
- **性能导向**: 专注于最大化处理性能
- **适用场景**: 大数据量处理的高性能场景
- **使用场景**: 需要最大化性能的场景,适合处理大量数据
### 测试维度
- **编码性能** - 数据编码成纠删码分片的速度
- **解码性能** - 从纠删码分片恢复原始数据的速度
- **分片大小敏感性** - 不同分片大小对性能的影响
- **纠删码配置** - 不同数据/奇偶分片比例的性能影响
- **SIMD模式性能** - SIMD优化的性能表现
- **并发性能** - 多线程环境下的性能表现
- **内存效率** - 内存使用模式和效率
- **错误恢复能力** - 不同丢失分片数量下的恢复性能
## 🚀 快速开始
### 运行快速测试
```bash
# 运行快速性能对比测试默认纯Erasure模式
./run_benchmarks.sh quick
```
### 运行完整对比测试
```bash
# 运行详细的实现对比测试
./run_benchmarks.sh comparison
```
### 运行特定模式的测试
```bash
# 测试默认纯 erasure 模式(推荐)
./run_benchmarks.sh erasure
# 测试SIMD模式
./run_benchmarks.sh simd
```
## 📈 手动运行基准测试
### 基本使用
```bash
# 运行所有基准测试(默认纯 erasure 模式)
cargo bench
# 运行特定的基准测试文件
cargo bench --bench erasure_benchmark
cargo bench --bench comparison_benchmark
```
### 对比不同实现模式
```bash
# 测试默认纯 erasure 模式
cargo bench --bench comparison_benchmark
# 测试SIMD模式
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd
# 保存基线进行对比
cargo bench --bench comparison_benchmark \
-- --save-baseline erasure_baseline
# 与基线比较SIMD模式性能
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --baseline erasure_baseline
```
### 过滤特定测试
```bash
# 只运行编码测试
cargo bench encode
# 只运行解码测试
cargo bench decode
# 只运行特定数据大小的测试
cargo bench 1MB
# 只运行特定配置的测试
cargo bench "4+2"
```
## 📊 查看结果
### HTML 报告
基准测试结果会自动生成 HTML 报告:
```bash
# 启动本地服务器查看报告
cd target/criterion
python3 -m http.server 8080
# 在浏览器中访问
open http://localhost:8080/report/index.html
```
### 命令行输出
基准测试会在终端显示:
- 每秒操作数 (ops/sec)
- 吞吐量 (MB/s)
- 延迟统计 (平均值、标准差、百分位数)
- 性能变化趋势
## 🔧 测试配置
### 数据大小
- **小数据**: 1KB, 8KB - 测试小文件场景
- **中等数据**: 64KB, 256KB - 测试常见文件大小
- **大数据**: 1MB, 4MB - 测试大文件处理和 SIMD 优化
- **超大数据**: 16MB+ - 测试高吞吐量场景
### 纠删码配置
- **(4,2)** - 常用配置33% 冗余
- **(6,3)** - 50% 冗余,平衡性能和可靠性
- **(8,4)** - 50% 冗余,更多并行度
- **(10,5)**, **(12,6)** - 高并行度配置
### 分片大小
测试从 32 字节到 8KB 的不同分片大小,特别关注:
- **内存对齐**: 64, 128, 256 字节 - 内存对齐对性能的影响
- **Cache 友好**: 1KB, 2KB, 4KB - CPU 缓存友好的大小
## 📝 解读测试结果
### 性能指标
1. **吞吐量 (Throughput)**
- 单位: MB/s 或 GB/s
- 衡量数据处理速度
- 越高越好
2. **延迟 (Latency)**
- 单位: 微秒 (μs) 或毫秒 (ms)
- 衡量单次操作时间
- 越低越好
3. **CPU 效率**
- 每 CPU 周期处理的字节数
- 反映算法效率
### 预期结果
**纯 Erasure 模式(默认)**:
- 性能稳定,对分片大小不敏感
- 兼容性最佳,支持所有配置
- 内存使用稳定可预测
**SIMD模式`reed-solomon-simd` feature**:
- 高性能SIMD优化实现
- 适合大数据量处理场景
- 专注于最大化性能
**分片大小敏感性**:
- SIMD模式对分片大小可能更敏感
- 纯 Erasure 模式对分片大小相对不敏感
**内存使用**:
- SIMD模式可能有特定的内存对齐要求
- 纯 Erasure 模式内存使用更稳定
## 🛠️ 自定义测试
### 添加新的测试场景
编辑 `benches/erasure_benchmark.rs``benches/comparison_benchmark.rs`
```rust
// 添加新的测试配置
let configs = vec![
// 你的自定义配置
BenchConfig::new(10, 4, 2048 * 1024, 2048 * 1024), // 10+4, 2MB
];
```
### 调整测试参数
```rust
// 修改采样和测试时间
group.sample_size(20); // 样本数量
group.measurement_time(Duration::from_secs(10)); // 测试时间
```
## 🐛 故障排除
### 常见问题
1. **编译错误**: 确保安装了正确的依赖
```bash
cargo update
cargo build --all-features
```
2. **性能异常**: 检查是否在正确的模式下运行
```bash
# 检查当前配置
cargo bench --bench comparison_benchmark -- --help
```
3. **测试时间过长**: 调整测试参数
```bash
# 使用更短的测试时间
cargo bench -- --quick
```
### 性能分析
使用 `perf` 等工具进行更详细的性能分析:
```bash
# 分析 CPU 使用情况
cargo bench --bench comparison_benchmark &
perf record -p $(pgrep -f comparison_benchmark)
perf report
```
## 🤝 贡献
欢迎提交新的基准测试场景或优化建议:
1. Fork 项目
2. 创建特性分支: `git checkout -b feature/new-benchmark`
3. 添加测试用例
4. 提交更改: `git commit -m 'Add new benchmark for XYZ'`
5. 推送到分支: `git push origin feature/new-benchmark`
6. 创建 Pull Request
## 📚 参考资料
- [reed-solomon-erasure crate](https://crates.io/crates/reed-solomon-erasure)
- [reed-solomon-simd crate](https://crates.io/crates/reed-solomon-simd)
- [Criterion.rs 基准测试框架](https://bheisler.github.io/criterion.rs/book/)
- [Reed-Solomon 纠删码原理](https://en.wikipedia.org/wiki/Reed%E2%80%93Solomon_error_correction)
---
💡 **提示**:
- 推荐使用默认的纯Erasure模式它在各种场景下都有稳定的表现
- 对于高性能需求可以考虑SIMD模式
- 基准测试结果可能因硬件、操作系统和编译器版本而异
- 建议在目标部署环境中运行测试以获得最准确的性能数据

View File

@@ -11,7 +11,7 @@ rust-version.workspace = true
workspace = true
[features]
default = ["reed-solomon-erasure"]
default = ["reed-solomon-simd"]
reed-solomon-simd = []
reed-solomon-erasure = []

View File

@@ -1,97 +1,69 @@
# Reed-Solomon 实现对比分析
# Reed-Solomon Implementation Comparison Analysis
## 🔍 问题分析
## 🔍 Issue Analysis
随着新的混合模式设计,我们已经解决了传统纯 SIMD 模式的兼容性问题。现在系统能够智能地在不同场景下选择最优实现。
With the optimized SIMD mode design, we provide high-performance Reed-Solomon implementation. The system can now deliver optimal performance across different scenarios.
## 📊 实现模式对比
## 📊 Implementation Mode Comparison
### 🏛️ Erasure 模式(默认,推荐)
### 🏛️ Pure Erasure Mode (Default, Recommended)
**默认配置**: 不指定任何 feature,使用稳定的 reed-solomon-erasure 实现
**Default Configuration**: No features specified, uses stable reed-solomon-erasure implementation
**特点**:
-**广泛兼容**: 支持任意分片大小,从字节级到 GB 级
- 📈 **稳定性能**: 性能对分片大小不敏感,可预测
- 🔧 **生产就绪**: 成熟稳定的实现,已在生产环境广泛使用
- 💾 **内存高效**: 优化的内存使用模式
- 🎯 **一致性**: 在所有场景下行为完全一致
**Characteristics**:
-**Wide Compatibility**: Supports any shard size from byte-level to GB-level
- 📈 **Stable Performance**: Performance insensitive to shard size, predictable
- 🔧 **Production Ready**: Mature and stable implementation, widely used in production
- 💾 **Memory Efficient**: Optimized memory usage patterns
- 🎯 **Consistency**: Completely consistent behavior across all scenarios
**使用场景**:
- 大多数生产环境的默认选择
- 需要完全一致和可预测的性能行为
- 对性能变化敏感的系统
- 主要处理小文件或小分片的场景
- 需要严格的内存使用控制
**Use Cases**:
- Default choice for most production environments
- Systems requiring completely consistent and predictable performance behavior
- Performance-change-sensitive systems
- Scenarios mainly processing small files or small shards
- Systems requiring strict memory usage control
### 🎯 混合模式(`reed-solomon-simd` feature
### 🎯 SIMD Mode (`reed-solomon-simd` feature)
**配置**: `--features reed-solomon-simd`
**Configuration**: `--features reed-solomon-simd`
**特点**:
- 🧠 **智能选择**: 根据分片大小自动选择 SIMD 或 Erasure 实现
- 🚀 **最优性能**: 大分片使用 SIMD 优化,小分片使用稳定的 Erasure 实现
- 🔄 **自动回退**: SIMD 失败时无缝回退到 Erasure 实现
- **全兼容**: 支持所有分片大小和配置,无失败风险
- 🎯 **高性能**: 适合需要最大化性能的场景
**Characteristics**:
- 🚀 **High-Performance SIMD**: Uses SIMD instruction sets for high-performance encoding/decoding
- 🎯 **Performance Oriented**: Focuses on maximizing processing performance
- **Large Data Optimization**: Suitable for high-throughput scenarios with large data processing
- 🏎️ **Speed Priority**: Designed for performance-critical applications
**回退逻辑**:
```rust
const SIMD_MIN_SHARD_SIZE: usize = 512;
**Use Cases**:
- Application scenarios requiring maximum performance
- High-throughput systems processing large amounts of data
- Scenarios with extremely high performance requirements
- CPU-intensive workloads
// 智能选择策略
if shard_len >= SIMD_MIN_SHARD_SIZE {
// 尝试使用 SIMD 优化
match simd_encode(data) {
Ok(result) => return Ok(result),
Err(_) => {
// SIMD 失败,自动回退到 Erasure
warn!("SIMD failed, falling back to Erasure");
erasure_encode(data)
}
}
} else {
// 分片太小,直接使用 Erasure
erasure_encode(data)
}
```
## 📏 Shard Size vs Performance Comparison
**成功案例**:
```
✅ 1KB 数据 + 6+3 配置 → 171字节/分片 → 自动使用 Erasure 实现
✅ 64KB 数据 + 4+2 配置 → 16KB/分片 → 自动使用 SIMD 优化
✅ 任意配置 → 智能选择最优实现
```
Performance across different configurations:
**使用场景**:
- 需要最大化性能的应用场景
- 处理大量数据的高吞吐量系统
- 对性能要求极高的场景
| Data Size | Config | Shard Size | Pure Erasure Mode (Default) | SIMD Mode Strategy | Performance Comparison |
|-----------|--------|------------|----------------------------|-------------------|----------------------|
| 1KB | 4+2 | 256 bytes | Erasure implementation | SIMD implementation | SIMD may be faster |
| 1KB | 6+3 | 171 bytes | Erasure implementation | SIMD implementation | SIMD may be faster |
| 1KB | 8+4 | 128 bytes | Erasure implementation | SIMD implementation | SIMD may be faster |
| 64KB | 4+2 | 16KB | Erasure implementation | SIMD optimization | SIMD mode faster |
| 64KB | 6+3 | 10.7KB | Erasure implementation | SIMD optimization | SIMD mode faster |
| 1MB | 4+2 | 256KB | Erasure implementation | SIMD optimization | SIMD mode significantly faster |
| 16MB | 8+4 | 2MB | Erasure implementation | SIMD optimization | SIMD mode substantially faster |
## 📏 分片大小与性能对比
## 🎯 Benchmark Results Interpretation
不同配置下的性能表现:
| 数据大小 | 配置 | 分片大小 | 纯 Erasure 模式(默认) | 混合模式策略 | 性能对比 |
|---------|------|----------|------------------------|-------------|----------|
| 1KB | 4+2 | 256字节 | Erasure 实现 | Erasure 实现 | 相同 |
| 1KB | 6+3 | 171字节 | Erasure 实现 | Erasure 实现 | 相同 |
| 1KB | 8+4 | 128字节 | Erasure 实现 | Erasure 实现 | 相同 |
| 64KB | 4+2 | 16KB | Erasure 实现 | SIMD 优化 | 混合模式更快 |
| 64KB | 6+3 | 10.7KB | Erasure 实现 | SIMD 优化 | 混合模式更快 |
| 1MB | 4+2 | 256KB | Erasure 实现 | SIMD 优化 | 混合模式显著更快 |
| 16MB | 8+4 | 2MB | Erasure 实现 | SIMD 优化 | 混合模式大幅领先 |
## 🎯 基准测试结果解读
### 纯 Erasure 模式示例(默认) ✅
### Pure Erasure Mode Example (Default) ✅
```
encode_comparison/implementation/1KB_6+3_erasure
time: [245.67 ns 256.78 ns 267.89 ns]
thrpt: [3.73 GiB/s 3.89 GiB/s 4.07 GiB/s]
💡 一致的 Erasure 性能 - 所有配置都使用相同实现
💡 Consistent Erasure performance - All configurations use the same implementation
```
```
@@ -99,262 +71,263 @@ encode_comparison/implementation/64KB_4+2_erasure
time: [2.3456 μs 2.4567 μs 2.5678 μs]
thrpt: [23.89 GiB/s 24.65 GiB/s 25.43 GiB/s]
💡 稳定可靠的性能 - 适合大多数生产场景
💡 Stable and reliable performance - Suitable for most production scenarios
```
### 混合模式成功示例
### SIMD Mode Success Examples
**大分片 SIMD 优化**:
**Large Shard SIMD Optimization**:
```
encode_comparison/implementation/64KB_4+2_hybrid
encode_comparison/implementation/64KB_4+2_simd
time: [1.2345 μs 1.2567 μs 1.2789 μs]
thrpt: [47.89 GiB/s 48.65 GiB/s 49.43 GiB/s]
💡 使用 SIMD 优化 - 分片大小: 16KB ≥ 512字节
💡 Using SIMD optimization - Shard size: 16KB, high-performance processing
```
**小分片智能回退**:
**Small Shard SIMD Processing**:
```
encode_comparison/implementation/1KB_6+3_hybrid
encode_comparison/implementation/1KB_6+3_simd
time: [234.56 ns 245.67 ns 256.78 ns]
thrpt: [3.89 GiB/s 4.07 GiB/s 4.26 GiB/s]
💡 智能回退到 Erasure - 分片大小: 171字节 < 512字节
💡 SIMD processing small shards - Shard size: 171 bytes
```
**回退机制触发**:
```
⚠️ SIMD encoding failed: InvalidShardSize, using fallback
✅ Fallback to Erasure successful - 无缝处理
```
## 🛠️ Usage Guide
## 🛠️ 使用指南
### Selection Strategy
### 选择策略
#### 1⃣ 推荐:纯 Erasure 模式(默认)
#### 1⃣ Recommended: Pure Erasure Mode (Default)
```bash
# 无需指定 feature,使用默认配置
# No features needed, use default configuration
cargo run
cargo test
cargo bench
```
**适用场景**:
- 📊 **一致性要求**: 需要完全可预测的性能行为
- 🔬 **生产环境**: 大多数生产场景的最佳选择
- 💾 **内存敏感**: 对内存使用模式有严格要求
- 🏗️ **稳定可靠**: 成熟稳定的实现
**Applicable Scenarios**:
- 📊 **Consistency Requirements**: Need completely predictable performance behavior
- 🔬 **Production Environment**: Best choice for most production scenarios
- 💾 **Memory Sensitive**: Strict requirements for memory usage patterns
- 🏗️ **Stable and Reliable**: Mature and stable implementation
#### 2高性能需求:混合模式
#### 2High Performance Requirements: SIMD Mode
```bash
# 启用混合模式获得最大性能
# Enable SIMD mode for maximum performance
cargo run --features reed-solomon-simd
cargo test --features reed-solomon-simd
cargo bench --features reed-solomon-simd
```
**适用场景**:
- 🎯 **高性能场景**: 处理大量数据需要最大吞吐量
- 🚀 **性能优化**: 希望在大数据时获得最佳性能
- 🔄 **智能适应**: 让系统自动选择最优策略
- 🛡 **容错能力**: 需要最大的兼容性和稳定性
**Applicable Scenarios**:
- 🎯 **High Performance Scenarios**: Processing large amounts of data requiring maximum throughput
- 🚀 **Performance Optimization**: Want optimal performance for large data
- **Speed Priority**: Scenarios with extremely high speed requirements
- 🏎 **Compute Intensive**: CPU-intensive workloads
### 配置优化建议
### Configuration Optimization Recommendations
#### 针对数据大小的配置
#### Based on Data Size
**小文件为主** (< 64KB):
**Small Files Primarily** (< 64KB):
```toml
# 推荐使用默认纯 Erasure 模式
# 无需特殊配置,性能稳定可靠
# Recommended to use default pure Erasure mode
# No special configuration needed, stable and reliable performance
```
**大文件为主** (> 1MB):
**Large Files Primarily** (> 1MB):
```toml
# 可考虑启用混合模式获得更高性能
# Recommend enabling SIMD mode for higher performance
# features = ["reed-solomon-simd"]
```
**混合场景**:
**Mixed Scenarios**:
```toml
# 默认纯 Erasure 模式适合大多数场景
# 如需最大性能可启用: features = ["reed-solomon-simd"]
# Default pure Erasure mode suits most scenarios
# For maximum performance, enable: features = ["reed-solomon-simd"]
```
#### 针对纠删码配置的建议
#### Recommendations Based on Erasure Coding Configuration
| 配置 | 小数据 (< 64KB) | 大数据 (> 1MB) | 推荐模式 |
|------|----------------|----------------|----------|
| 4+2 | Erasure | Erasure / 混合模式 | 纯 Erasure默认 |
| 6+3 | Erasure | Erasure / 混合模式 | 纯 Erasure默认 |
| 8+4 | Erasure | Erasure / 混合模式 | 纯 Erasure默认 |
| 10+5 | Erasure | Erasure / 混合模式 | 纯 Erasure默认 |
| Config | Small Data (< 64KB) | Large Data (> 1MB) | Recommended Mode |
|--------|-------------------|-------------------|------------------|
| 4+2 | Pure Erasure | Pure Erasure / SIMD Mode | Pure Erasure (Default) |
| 6+3 | Pure Erasure | Pure Erasure / SIMD Mode | Pure Erasure (Default) |
| 8+4 | Pure Erasure | Pure Erasure / SIMD Mode | Pure Erasure (Default) |
| 10+5 | Pure Erasure | Pure Erasure / SIMD Mode | Pure Erasure (Default) |
### 生产环境部署建议
### Production Environment Deployment Recommendations
#### 1默认部署策略
#### 1Default Deployment Strategy
```bash
# 生产环境推荐配置:使用纯 Erasure 模式(默认)
# Production environment recommended configuration: Use pure Erasure mode (default)
cargo build --release
```
**优势**:
-最大兼容性:处理任意大小数据
-稳定可靠:成熟的实现,行为可预测
-零配置:无需复杂的性能调优
-内存高效:优化的内存使用模式
**Advantages**:
-Maximum compatibility: Handle data of any size
-Stable and reliable: Mature implementation, predictable behavior
-Zero configuration: No complex performance tuning needed
-Memory efficient: Optimized memory usage patterns
#### 2高性能部署策略
#### 2High Performance Deployment Strategy
```bash
# 高性能场景:启用混合模式
# High performance scenarios: Enable SIMD mode
cargo build --release --features reed-solomon-simd
```
**优势**:
-最优性能:自动选择最佳实现
-智能回退SIMD 失败自动回退到 Erasure
-大数据优化:大分片自动使用 SIMD 优化
-兼容保证:小分片使用稳定的 Erasure 实现
**Advantages**:
-Optimal performance: SIMD instruction set optimization
-High throughput: Suitable for large data processing
-Performance oriented: Focuses on maximizing processing speed
-Modern hardware: Fully utilizes modern CPU features
#### 2监控和调优
#### 2Monitoring and Tuning
```rust
// 启用警告日志查看回退情况
RUST_LOG=warn ./your_application
// 典型日志输出
warn!("SIMD encoding failed: InvalidShardSize, using fallback");
info!("Smart fallback to Erasure successful");
```
#### 3⃣ 性能监控指标
- **回退频率**: 监控 SIMD 到 Erasure 的回退次数
- **性能分布**: 观察不同数据大小的性能表现
- **内存使用**: 监控内存分配模式
- **延迟分布**: 分析编码/解码延迟的统计分布
## 🔧 故障排除
### 性能问题诊断
#### 问题1: 性能不稳定
**现象**: 相同操作的性能差异很大
**原因**: 可能在 SIMD/Erasure 切换边界附近
**解决**:
```rust
// 检查分片大小
let shard_size = data.len().div_ceil(data_shards);
println!("Shard size: {} bytes", shard_size);
if shard_size >= 512 {
println!("Expected to use SIMD optimization");
} else {
println!("Expected to use Erasure fallback");
// Choose appropriate implementation based on specific scenarios
match data_size {
size if size > 1024 * 1024 => {
// Large data: Consider using SIMD mode
println!("Large data detected, SIMD mode recommended");
}
_ => {
// General case: Use default Erasure mode
println!("Using default Erasure mode");
}
}
```
#### 问题2: 意外的回退行为
**现象**: 大分片仍然使用 Erasure 实现
**原因**: SIMD 初始化失败或系统限制
**解决**:
```bash
# 启用详细日志查看回退原因
RUST_LOG=debug ./your_application
#### 3⃣ Performance Monitoring Metrics
- **Throughput Monitoring**: Monitor encoding/decoding data processing rates
- **Latency Analysis**: Analyze processing latency for different data sizes
- **CPU Utilization**: Observe CPU utilization efficiency of SIMD instructions
- **Memory Usage**: Monitor memory allocation patterns of different implementations
## 🔧 Troubleshooting
### Performance Issue Diagnosis
#### Issue 1: Performance Not Meeting Expectations
**Symptom**: SIMD mode performance improvement not significant
**Cause**: Data size may not be suitable for SIMD optimization
**Solution**:
```rust
// Check shard size and data characteristics
let shard_size = data.len().div_ceil(data_shards);
println!("Shard size: {} bytes", shard_size);
if shard_size >= 1024 {
println!("Good candidate for SIMD optimization");
} else {
println!("Consider using default Erasure mode");
}
```
#### 问题3: 内存使用异常
**现象**: 内存使用超出预期
**原因**: SIMD 实现的内存对齐要求
**解决**:
#### Issue 2: Compilation Errors
**Symptom**: SIMD-related compilation errors
**Cause**: Platform not supported or missing dependencies
**Solution**:
```bash
# 使用纯 Erasure 模式进行对比
# Check platform support
cargo check --features reed-solomon-simd
# If failed, use default mode
cargo check
```
#### Issue 3: Abnormal Memory Usage
**Symptom**: Memory usage exceeds expectations
**Cause**: Memory alignment requirements of SIMD implementation
**Solution**:
```bash
# Use pure Erasure mode for comparison
cargo run --features reed-solomon-erasure
```
### 调试技巧
### Debugging Tips
#### 1强制使用特定模式
#### 1Performance Comparison Testing
```bash
# 测试纯 Erasure 模式性能
# Test pure Erasure mode performance
cargo bench --features reed-solomon-erasure
# 测试混合模式性能(默认)
cargo bench
# Test SIMD mode performance
cargo bench --features reed-solomon-simd
```
#### 2分析分片大小分布
#### 2Analyze Data Characteristics
```rust
// 统计你的应用中的分片大小分布
let shard_sizes: Vec<usize> = data_samples.iter()
.map(|data| data.len().div_ceil(data_shards))
// Statistics of data characteristics in your application
let data_sizes: Vec<usize> = data_samples.iter()
.map(|data| data.len())
.collect();
let simd_eligible = shard_sizes.iter()
.filter(|&&size| size >= 512)
let large_data_count = data_sizes.iter()
.filter(|&&size| size >= 1024 * 1024)
.count();
println!("SIMD eligible: {}/{} ({}%)",
simd_eligible,
shard_sizes.len(),
simd_eligible * 100 / shard_sizes.len()
println!("Large data (>1MB): {}/{} ({}%)",
large_data_count,
data_sizes.len(),
large_data_count * 100 / data_sizes.len()
);
```
#### 3基准测试对比
#### 3Benchmark Comparison
```bash
# 生成详细的性能对比报告
# Generate detailed performance comparison report
./run_benchmarks.sh comparison
# 查看 HTML 报告分析性能差异
# View HTML report to analyze performance differences
cd target/criterion && python3 -m http.server 8080
```
## 📈 性能优化建议
## 📈 Performance Optimization Recommendations
### 应用层优化
### Application Layer Optimization
#### 1数据分块策略
#### 1Data Chunking Strategy
```rust
// 针对混合模式优化数据分块
// Optimize data chunking for SIMD mode
const OPTIMAL_BLOCK_SIZE: usize = 1024 * 1024; // 1MB
const MIN_SIMD_BLOCK_SIZE: usize = data_shards * 512; // 确保分片 >= 512B
const MIN_EFFICIENT_SIZE: usize = 64 * 1024; // 64KB
let block_size = if data.len() < MIN_SIMD_BLOCK_SIZE {
data.len() // 小数据直接处理,会自动回退
let block_size = if data.len() < MIN_EFFICIENT_SIZE {
data.len() // Small data can consider default mode
} else {
OPTIMAL_BLOCK_SIZE.min(data.len()) // 使用最优块大小
OPTIMAL_BLOCK_SIZE.min(data.len()) // Use optimal block size
};
```
#### 2配置调优
#### 2Configuration Tuning
```rust
// 根据典型数据大小选择纠删码配置
// Choose erasure coding configuration based on typical data size
let (data_shards, parity_shards) = if typical_file_size > 1024 * 1024 {
(8, 4) // 大文件:更多并行度,利用 SIMD
(8, 4) // Large files: more parallelism, utilize SIMD
} else {
(4, 2) // 小文件:简单配置,减少开销
(4, 2) // Small files: simple configuration, reduce overhead
};
```
### 系统层优化
### System Layer Optimization
#### 1⃣ CPU 特性检测
#### 1⃣ CPU Feature Detection
```bash
# 检查 CPU 支持的 SIMD 指令集
# Check CPU supported SIMD instruction sets
lscpu | grep -i flags
cat /proc/cpuinfo | grep -i flags | head -1
```
#### 2内存对齐优化
#### 2Memory Alignment Optimization
```rust
// 确保数据内存对齐以提升 SIMD 性能
// Ensure data memory alignment to improve SIMD performance
use aligned_vec::AlignedVec;
let aligned_data = AlignedVec::<u8, aligned_vec::A64>::from_slice(&data);
```
---
💡 **关键结论**:
- 🎯 **混合模式(默认)是最佳选择**:兼顾性能和兼容性
- 🔄 **智能回退机制**:解决了传统 SIMD 模式的兼容性问题
- 📊 **透明优化**:用户无需关心实现细节,系统自动选择最优策略
- 🛡️ **零失败风险**:在任何配置下都能正常工作
💡 **Key Conclusions**:
- 🎯 **Pure Erasure mode (default) is the best general choice**: Stable and reliable, suitable for most scenarios
- 🚀 **SIMD mode suitable for high-performance scenarios**: Best choice for large data processing
- 📊 **Choose based on data characteristics**: Small data use Erasure, large data consider SIMD
- 🛡️ **Stability priority**: Production environments recommend using default Erasure mode

View File

@@ -0,0 +1,333 @@
# Reed-Solomon 实现对比分析
## 🔍 问题分析
随着SIMD模式的优化设计我们提供了高性能的Reed-Solomon实现。现在系统能够在不同场景下提供最优的性能表现。
## 📊 实现模式对比
### 🏛️ 纯 Erasure 模式(默认,推荐)
**默认配置**: 不指定任何 feature使用稳定的 reed-solomon-erasure 实现
**特点**:
-**广泛兼容**: 支持任意分片大小,从字节级到 GB 级
- 📈 **稳定性能**: 性能对分片大小不敏感,可预测
- 🔧 **生产就绪**: 成熟稳定的实现,已在生产环境广泛使用
- 💾 **内存高效**: 优化的内存使用模式
- 🎯 **一致性**: 在所有场景下行为完全一致
**使用场景**:
- 大多数生产环境的默认选择
- 需要完全一致和可预测的性能行为
- 对性能变化敏感的系统
- 主要处理小文件或小分片的场景
- 需要严格的内存使用控制
### 🎯 SIMD模式`reed-solomon-simd` feature
**配置**: `--features reed-solomon-simd`
**特点**:
- 🚀 **高性能SIMD**: 使用SIMD指令集进行高性能编码解码
- 🎯 **性能导向**: 专注于最大化处理性能
-**大数据优化**: 适合大数据量处理的高吞吐量场景
- 🏎️ **速度优先**: 为性能关键型应用设计
**使用场景**:
- 需要最大化性能的应用场景
- 处理大量数据的高吞吐量系统
- 对性能要求极高的场景
- CPU密集型工作负载
## 📏 分片大小与性能对比
不同配置下的性能表现:
| 数据大小 | 配置 | 分片大小 | 纯 Erasure 模式(默认) | SIMD模式策略 | 性能对比 |
|---------|------|----------|------------------------|-------------|----------|
| 1KB | 4+2 | 256字节 | Erasure 实现 | SIMD 实现 | SIMD可能更快 |
| 1KB | 6+3 | 171字节 | Erasure 实现 | SIMD 实现 | SIMD可能更快 |
| 1KB | 8+4 | 128字节 | Erasure 实现 | SIMD 实现 | SIMD可能更快 |
| 64KB | 4+2 | 16KB | Erasure 实现 | SIMD 优化 | SIMD模式更快 |
| 64KB | 6+3 | 10.7KB | Erasure 实现 | SIMD 优化 | SIMD模式更快 |
| 1MB | 4+2 | 256KB | Erasure 实现 | SIMD 优化 | SIMD模式显著更快 |
| 16MB | 8+4 | 2MB | Erasure 实现 | SIMD 优化 | SIMD模式大幅领先 |
## 🎯 基准测试结果解读
### 纯 Erasure 模式示例(默认) ✅
```
encode_comparison/implementation/1KB_6+3_erasure
time: [245.67 ns 256.78 ns 267.89 ns]
thrpt: [3.73 GiB/s 3.89 GiB/s 4.07 GiB/s]
💡 一致的 Erasure 性能 - 所有配置都使用相同实现
```
```
encode_comparison/implementation/64KB_4+2_erasure
time: [2.3456 μs 2.4567 μs 2.5678 μs]
thrpt: [23.89 GiB/s 24.65 GiB/s 25.43 GiB/s]
💡 稳定可靠的性能 - 适合大多数生产场景
```
### SIMD模式成功示例 ✅
**大分片 SIMD 优化**:
```
encode_comparison/implementation/64KB_4+2_simd
time: [1.2345 μs 1.2567 μs 1.2789 μs]
thrpt: [47.89 GiB/s 48.65 GiB/s 49.43 GiB/s]
💡 使用 SIMD 优化 - 分片大小: 16KB高性能处理
```
**小分片 SIMD 处理**:
```
encode_comparison/implementation/1KB_6+3_simd
time: [234.56 ns 245.67 ns 256.78 ns]
thrpt: [3.89 GiB/s 4.07 GiB/s 4.26 GiB/s]
💡 SIMD 处理小分片 - 分片大小: 171字节
```
## 🛠️ 使用指南
### 选择策略
#### 1⃣ 推荐:纯 Erasure 模式(默认)
```bash
# 无需指定 feature使用默认配置
cargo run
cargo test
cargo bench
```
**适用场景**:
- 📊 **一致性要求**: 需要完全可预测的性能行为
- 🔬 **生产环境**: 大多数生产场景的最佳选择
- 💾 **内存敏感**: 对内存使用模式有严格要求
- 🏗️ **稳定可靠**: 成熟稳定的实现
#### 2⃣ 高性能需求SIMD模式
```bash
# 启用SIMD模式获得最大性能
cargo run --features reed-solomon-simd
cargo test --features reed-solomon-simd
cargo bench --features reed-solomon-simd
```
**适用场景**:
- 🎯 **高性能场景**: 处理大量数据需要最大吞吐量
- 🚀 **性能优化**: 希望在大数据时获得最佳性能
-**速度优先**: 对处理速度有极高要求的场景
- 🏎️ **计算密集**: CPU密集型工作负载
### 配置优化建议
#### 针对数据大小的配置
**小文件为主** (< 64KB):
```toml
# 推荐使用默认纯 Erasure 模式
# 无需特殊配置,性能稳定可靠
```
**大文件为主** (> 1MB):
```toml
# 建议启用SIMD模式获得更高性能
# features = ["reed-solomon-simd"]
```
**混合场景**:
```toml
# 默认纯 Erasure 模式适合大多数场景
# 如需最大性能可启用: features = ["reed-solomon-simd"]
```
#### 针对纠删码配置的建议
| 配置 | 小数据 (< 64KB) | 大数据 (> 1MB) | 推荐模式 |
|------|----------------|----------------|----------|
| 4+2 | 纯 Erasure | 纯 Erasure / SIMD模式 | 纯 Erasure默认 |
| 6+3 | 纯 Erasure | 纯 Erasure / SIMD模式 | 纯 Erasure默认 |
| 8+4 | 纯 Erasure | 纯 Erasure / SIMD模式 | 纯 Erasure默认 |
| 10+5 | 纯 Erasure | 纯 Erasure / SIMD模式 | 纯 Erasure默认 |
### 生产环境部署建议
#### 1⃣ 默认部署策略
```bash
# 生产环境推荐配置:使用纯 Erasure 模式(默认)
cargo build --release
```
**优势**:
- ✅ 最大兼容性:处理任意大小数据
- ✅ 稳定可靠:成熟的实现,行为可预测
- ✅ 零配置:无需复杂的性能调优
- ✅ 内存高效:优化的内存使用模式
#### 2⃣ 高性能部署策略
```bash
# 高性能场景启用SIMD模式
cargo build --release --features reed-solomon-simd
```
**优势**:
- ✅ 最优性能SIMD指令集优化
- ✅ 高吞吐量:适合大数据处理
- ✅ 性能导向:专注于最大化处理速度
- ✅ 现代硬件充分利用现代CPU特性
#### 2⃣ 监控和调优
```rust
// 根据具体场景选择合适的实现
match data_size {
size if size > 1024 * 1024 => {
// 大数据考虑使用SIMD模式
println!("Large data detected, SIMD mode recommended");
}
_ => {
// 一般情况使用默认Erasure模式
println!("Using default Erasure mode");
}
}
```
#### 3⃣ 性能监控指标
- **吞吐量监控**: 监控编码/解码的数据处理速率
- **延迟分析**: 分析不同数据大小的处理延迟
- **CPU使用率**: 观察SIMD指令的CPU利用效率
- **内存使用**: 监控不同实现的内存分配模式
## 🔧 故障排除
### 性能问题诊断
#### 问题1: 性能不符合预期
**现象**: SIMD模式性能提升不明显
**原因**: 可能数据大小不适合SIMD优化
**解决**:
```rust
// 检查分片大小和数据特征
let shard_size = data.len().div_ceil(data_shards);
println!("Shard size: {} bytes", shard_size);
if shard_size >= 1024 {
println!("Good candidate for SIMD optimization");
} else {
println!("Consider using default Erasure mode");
}
```
#### 问题2: 编译错误
**现象**: SIMD相关的编译错误
**原因**: 平台不支持或依赖缺失
**解决**:
```bash
# 检查平台支持
cargo check --features reed-solomon-simd
# 如果失败,使用默认模式
cargo check
```
#### 问题3: 内存使用异常
**现象**: 内存使用超出预期
**原因**: SIMD实现的内存对齐要求
**解决**:
```bash
# 使用纯 Erasure 模式进行对比
cargo run --features reed-solomon-erasure
```
### 调试技巧
#### 1⃣ 性能对比测试
```bash
# 测试纯 Erasure 模式性能
cargo bench --features reed-solomon-erasure
# 测试SIMD模式性能
cargo bench --features reed-solomon-simd
```
#### 2⃣ 分析数据特征
```rust
// 统计你的应用中的数据特征
let data_sizes: Vec<usize> = data_samples.iter()
.map(|data| data.len())
.collect();
let large_data_count = data_sizes.iter()
.filter(|&&size| size >= 1024 * 1024)
.count();
println!("Large data (>1MB): {}/{} ({}%)",
large_data_count,
data_sizes.len(),
large_data_count * 100 / data_sizes.len()
);
```
#### 3⃣ 基准测试对比
```bash
# 生成详细的性能对比报告
./run_benchmarks.sh comparison
# 查看 HTML 报告分析性能差异
cd target/criterion && python3 -m http.server 8080
```
## 📈 性能优化建议
### 应用层优化
#### 1⃣ 数据分块策略
```rust
// 针对SIMD模式优化数据分块
const OPTIMAL_BLOCK_SIZE: usize = 1024 * 1024; // 1MB
const MIN_EFFICIENT_SIZE: usize = 64 * 1024; // 64KB
let block_size = if data.len() < MIN_EFFICIENT_SIZE {
data.len() // 小数据可以考虑默认模式
} else {
OPTIMAL_BLOCK_SIZE.min(data.len()) // 使用最优块大小
};
```
#### 2⃣ 配置调优
```rust
// 根据典型数据大小选择纠删码配置
let (data_shards, parity_shards) = if typical_file_size > 1024 * 1024 {
(8, 4) // 大文件:更多并行度,利用 SIMD
} else {
(4, 2) // 小文件:简单配置,减少开销
};
```
### 系统层优化
#### 1⃣ CPU 特性检测
```bash
# 检查 CPU 支持的 SIMD 指令集
lscpu | grep -i flags
cat /proc/cpuinfo | grep -i flags | head -1
```
#### 2⃣ 内存对齐优化
```rust
// 确保数据内存对齐以提升 SIMD 性能
use aligned_vec::AlignedVec;
let aligned_data = AlignedVec::<u8, aligned_vec::A64>::from_slice(&data);
```
---
💡 **关键结论**:
- 🎯 **纯Erasure模式默认是最佳通用选择**:稳定可靠,适合大多数场景
- 🚀 **SIMD模式适合高性能场景**:大数据处理的最佳选择
- 📊 **根据数据特征选择**小数据用Erasure大数据考虑SIMD
- 🛡️ **稳定性优先**生产环境建议使用默认Erasure模式

View File

@@ -2,7 +2,7 @@
//!
//! This benchmark compares the performance of different Reed-Solomon implementations:
//! - Default (Pure erasure): Stable reed-solomon-erasure implementation
//! - `reed-solomon-simd` feature: Hybrid mode with SIMD optimization and erasure fallback
//! - `reed-solomon-simd` feature: SIMD mode with optimized performance
//!
//! ## Running Benchmarks
//!

View File

@@ -2,7 +2,7 @@
# Reed-Solomon 实现性能比较脚本
#
# 这个脚本将运行不同的基准测试来比较混合模式和纯Erasure模式的性能
# 这个脚本将运行不同的基准测试来比较SIMD模式和纯Erasure模式的性能
#
# 使用方法:
# ./run_benchmarks.sh [quick|full|comparison]
@@ -74,15 +74,16 @@ run_erasure_benchmark() {
print_success "纯 Erasure 模式基准测试完成"
}
# 运行混合模式基准测试(默认)
run_hybrid_benchmark() {
print_info "🎯 开始运行混合模式基准测试(默认)..."
# 运行SIMD模式基准测试
run_simd_benchmark() {
print_info "🎯 开始运行SIMD模式基准测试..."
echo "================================================"
cargo bench --bench comparison_benchmark \
-- --save-baseline hybrid_baseline
--features reed-solomon-simd \
-- --save-baseline simd_baseline
print_success "混合模式基准测试完成"
print_success "SIMD模式基准测试完成"
}
# 运行完整的基准测试套件
@@ -90,7 +91,7 @@ run_full_benchmark() {
print_info "🚀 开始运行完整基准测试套件..."
echo "================================================"
# 运行详细的基准测试(使用默认混合模式)
# 运行详细的基准测试(使用默认纯Erasure模式)
cargo bench --bench erasure_benchmark
print_success "完整基准测试套件完成"
@@ -106,8 +107,9 @@ run_comparison_benchmark() {
--features reed-solomon-erasure \
-- --save-baseline erasure_baseline
print_info "步骤 2: 测试混合模式并与 Erasure 模式对比..."
print_info "步骤 2: 测试SIMD模式并与 Erasure 模式对比..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --baseline erasure_baseline
print_success "性能对比测试完成"
@@ -141,8 +143,9 @@ run_quick_test() {
--features reed-solomon-erasure \
-- encode_comparison --quick
print_info "测试混合模式(默认)..."
print_info "测试SIMD模式..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- encode_comparison --quick
print_success "快速测试完成"
@@ -153,31 +156,31 @@ show_help() {
echo "Reed-Solomon 性能基准测试脚本"
echo ""
echo "实现模式:"
echo " 🎯 混合模式(默认) - SIMD + Erasure 智能回退,推荐使用"
echo " 🏛️ 纯 Erasure 模式 - 稳定兼容的 reed-solomon-erasure 实现"
echo " 🏛️ 纯 Erasure 模式(默认)- 稳定兼容的 reed-solomon-erasure 实现"
echo " 🎯 SIMD模式 - 高性能SIMD优化实现"
echo ""
echo "使用方法:"
echo " $0 [command]"
echo ""
echo "命令:"
echo " quick 运行快速性能测试"
echo " full 运行完整基准测试套件(混合模式)"
echo " full 运行完整基准测试套件(默认Erasure模式)"
echo " comparison 运行详细的实现模式对比测试"
echo " erasure 只测试纯 Erasure 模式"
echo " hybrid 只测试混合模式(默认行为)"
echo " simd 只测试SIMD模式"
echo " clean 清理测试结果"
echo " help 显示此帮助信息"
echo ""
echo "示例:"
echo " $0 quick # 快速测试两种模式"
echo " $0 comparison # 详细对比测试"
echo " $0 full # 完整测试套件(混合模式)"
echo " $0 hybrid # 只测试混合模式"
echo " $0 full # 完整测试套件(默认Erasure模式)"
echo " $0 simd # 只测试SIMD模式"
echo " $0 erasure # 只测试纯 Erasure 模式"
echo ""
echo "模式说明:"
echo " 混合模式: 大分片(≥512B)使用SIMD优化小分片自动回退到Erasure"
echo " Erasure模式: 所有情况都使用reed-solomon-erasure实现"
echo " Erasure模式: 使用reed-solomon-erasure实现稳定可靠"
echo " SIMD模式: 使用reed-solomon-simd实现高性能优化"
}
# 显示测试配置信息
@@ -193,16 +196,16 @@ show_test_info() {
if [ -f "/proc/cpuinfo" ]; then
echo " - CPU 型号: $(grep 'model name' /proc/cpuinfo | head -1 | cut -d: -f2 | xargs)"
if grep -q "avx2" /proc/cpuinfo; then
echo " - SIMD 支持: AVX2 ✅ (混合模式将利用SIMD优化)"
echo " - SIMD 支持: AVX2 ✅ (SIMD模式将利用SIMD优化)"
elif grep -q "sse4" /proc/cpuinfo; then
echo " - SIMD 支持: SSE4 ✅ (混合模式将利用SIMD优化)"
echo " - SIMD 支持: SSE4 ✅ (SIMD模式将利用SIMD优化)"
else
echo " - SIMD 支持: 未检测到高级 SIMD 特性 (混合模式将主要使用Erasure)"
echo " - SIMD 支持: 未检测到高级 SIMD 特性"
fi
fi
echo " - 默认模式: 混合模式 (SIMD + Erasure 智能回退)"
echo " - 回退阈值: 512字节分片大小"
echo " - 默认模式: 纯Erasure模式 (稳定可靠)"
echo " - 高性能模式: SIMD模式 (性能优化)"
echo ""
}
@@ -234,9 +237,9 @@ main() {
run_erasure_benchmark
generate_comparison_report
;;
"hybrid")
"simd")
cleanup
run_hybrid_benchmark
run_simd_benchmark
generate_comparison_report
;;
"clean")
@@ -254,7 +257,7 @@ main() {
esac
print_success "✨ 基准测试执行完成!"
print_info "💡 提示: 推荐使用混合模式默认它能自动在SIMD和Erasure之间智能选择"
print_info "💡 提示: 推荐使用默认的纯Erasure模式对于高性能需求可考虑SIMD模式"
}
# 如果直接运行此脚本,调用主函数

File diff suppressed because it is too large Load Diff

View File

@@ -38,13 +38,13 @@ use crate::utils::path::{
path_join, path_join_buf,
};
use crate::erasure_coding::bitrot_verify;
use common::defer;
use path_absolutize::Absolutize;
use rustfs_filemeta::{
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, Opts, RawFileInfo, UpdateFn, get_file_info,
read_xl_meta_no_data,
};
use rustfs_rio::bitrot_verify;
use rustfs_utils::HashAlgorithm;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;

View File

@@ -0,0 +1,458 @@
use pin_project_lite::pin_project;
use rustfs_utils::{HashAlgorithm, read_full, write_all};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
pin_project! {
/// BitrotReader reads (hash+data) blocks from an async reader and verifies hash integrity.
pub struct BitrotReader<R> {
#[pin]
inner: R,
hash_algo: HashAlgorithm,
shard_size: usize,
buf: Vec<u8>,
hash_buf: Vec<u8>,
hash_read: usize,
data_buf: Vec<u8>,
data_read: usize,
hash_checked: bool,
}
}
impl<R> BitrotReader<R>
where
R: AsyncRead + Unpin + Send + Sync,
{
/// Create a new BitrotReader.
pub fn new(inner: R, shard_size: usize, algo: HashAlgorithm) -> Self {
let hash_size = algo.size();
Self {
inner,
hash_algo: algo,
shard_size,
buf: Vec::new(),
hash_buf: vec![0u8; hash_size],
hash_read: 0,
data_buf: Vec::new(),
data_read: 0,
hash_checked: false,
}
}
/// Read a single (hash+data) block, verify hash, and return the number of bytes read into `out`.
/// Returns an error if hash verification fails or data exceeds shard_size.
pub async fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
if out.len() > self.shard_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("data size {} exceeds shard size {}", out.len(), self.shard_size),
));
}
let hash_size = self.hash_algo.size();
// Read hash
let mut hash_buf = vec![0u8; hash_size];
if hash_size > 0 {
self.inner.read_exact(&mut hash_buf).await?;
}
let data_len = read_full(&mut self.inner, out).await?;
// // Read data
// let mut data_len = 0;
// while data_len < out.len() {
// let n = self.inner.read(&mut out[data_len..]).await?;
// if n == 0 {
// break;
// }
// data_len += n;
// // Only read up to one shard_size block
// if data_len >= self.shard_size {
// break;
// }
// }
if hash_size > 0 {
let actual_hash = self.hash_algo.hash_encode(&out[..data_len]);
if actual_hash != hash_buf {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitrot hash mismatch"));
}
}
Ok(data_len)
}
}
pin_project! {
/// BitrotWriter writes (hash+data) blocks to an async writer.
pub struct BitrotWriter<W> {
#[pin]
inner: W,
hash_algo: HashAlgorithm,
shard_size: usize,
buf: Vec<u8>,
finished: bool,
}
}
impl<W> BitrotWriter<W>
where
W: AsyncWrite + Unpin + Send + Sync,
{
/// Create a new BitrotWriter.
pub fn new(inner: W, shard_size: usize, algo: HashAlgorithm) -> Self {
let hash_algo = algo;
Self {
inner,
hash_algo,
shard_size,
buf: Vec::new(),
finished: false,
}
}
pub fn into_inner(self) -> W {
self.inner
}
/// Write a (hash+data) block. Returns the number of data bytes written.
/// Returns an error if called after a short write or if data exceeds shard_size.
pub async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if self.finished {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "bitrot writer already finished"));
}
if buf.len() > self.shard_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("data size {} exceeds shard size {}", buf.len(), self.shard_size),
));
}
if buf.len() < self.shard_size {
self.finished = true;
}
let hash_algo = &self.hash_algo;
if hash_algo.size() > 0 {
let hash = hash_algo.hash_encode(buf);
self.buf.extend_from_slice(&hash);
}
self.buf.extend_from_slice(buf);
// Write hash+data in one call
let mut n = write_all(&mut self.inner, &self.buf).await?;
if n < hash_algo.size() {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"short write: not enough bytes written",
));
}
n -= hash_algo.size();
self.buf.clear();
Ok(n)
}
}
pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: HashAlgorithm) -> usize {
if algo != HashAlgorithm::HighwayHash256S {
return size;
}
size.div_ceil(shard_size) * algo.size() + size
}
pub async fn bitrot_verify<R: AsyncRead + Unpin + Send>(
mut r: R,
want_size: usize,
part_size: usize,
algo: HashAlgorithm,
_want: Vec<u8>,
mut shard_size: usize,
) -> std::io::Result<()> {
let mut hash_buf = vec![0; algo.size()];
let mut left = want_size;
if left != bitrot_shard_file_size(part_size, shard_size, algo.clone()) {
return Err(std::io::Error::other("bitrot shard file size mismatch"));
}
while left > 0 {
let n = r.read_exact(&mut hash_buf).await?;
left -= n;
if left < shard_size {
shard_size = left;
}
let mut buf = vec![0; shard_size];
let read = r.read_exact(&mut buf).await?;
let actual_hash = algo.hash_encode(&buf);
if actual_hash != hash_buf[0..n] {
return Err(std::io::Error::other("bitrot hash mismatch"));
}
left -= read;
}
Ok(())
}
/// Custom writer enum that supports inline buffer storage
pub enum CustomWriter {
/// Inline buffer writer - stores data in memory
InlineBuffer(Vec<u8>),
/// Disk-based writer using tokio file
Other(Box<dyn AsyncWrite + Unpin + Send + Sync>),
}
impl CustomWriter {
/// Create a new inline buffer writer
pub fn new_inline_buffer() -> Self {
Self::InlineBuffer(Vec::new())
}
/// Create a new disk writer from any AsyncWrite implementation
pub fn new_tokio_writer<W>(writer: W) -> Self
where
W: AsyncWrite + Unpin + Send + Sync + 'static,
{
Self::Other(Box::new(writer))
}
/// Get the inline buffer data if this is an inline buffer writer
pub fn get_inline_data(&self) -> Option<&[u8]> {
match self {
Self::InlineBuffer(data) => Some(data),
Self::Other(_) => None,
}
}
/// Extract the inline buffer data, consuming the writer
pub fn into_inline_data(self) -> Option<Vec<u8>> {
match self {
Self::InlineBuffer(data) => Some(data),
Self::Other(_) => None,
}
}
}
impl AsyncWrite for CustomWriter {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
match self.get_mut() {
Self::InlineBuffer(data) => {
data.extend_from_slice(buf);
std::task::Poll::Ready(Ok(buf.len()))
}
Self::Other(writer) => {
let pinned_writer = std::pin::Pin::new(writer.as_mut());
pinned_writer.poll_write(cx, buf)
}
}
}
fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<std::io::Result<()>> {
match self.get_mut() {
Self::InlineBuffer(_) => std::task::Poll::Ready(Ok(())),
Self::Other(writer) => {
let pinned_writer = std::pin::Pin::new(writer.as_mut());
pinned_writer.poll_flush(cx)
}
}
}
fn poll_shutdown(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<std::io::Result<()>> {
match self.get_mut() {
Self::InlineBuffer(_) => std::task::Poll::Ready(Ok(())),
Self::Other(writer) => {
let pinned_writer = std::pin::Pin::new(writer.as_mut());
pinned_writer.poll_shutdown(cx)
}
}
}
}
/// Wrapper around BitrotWriter that uses our custom writer
pub struct BitrotWriterWrapper {
bitrot_writer: BitrotWriter<CustomWriter>,
writer_type: WriterType,
}
/// Enum to track the type of writer we're using
enum WriterType {
InlineBuffer,
Other,
}
impl std::fmt::Debug for BitrotWriterWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BitrotWriterWrapper")
.field(
"writer_type",
&match self.writer_type {
WriterType::InlineBuffer => "InlineBuffer",
WriterType::Other => "Other",
},
)
.finish()
}
}
impl BitrotWriterWrapper {
/// Create a new BitrotWriterWrapper with custom writer
pub fn new(writer: CustomWriter, shard_size: usize, checksum_algo: HashAlgorithm) -> Self {
let writer_type = match &writer {
CustomWriter::InlineBuffer(_) => WriterType::InlineBuffer,
CustomWriter::Other(_) => WriterType::Other,
};
Self {
bitrot_writer: BitrotWriter::new(writer, shard_size, checksum_algo),
writer_type,
}
}
/// Write data to the bitrot writer
pub async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.bitrot_writer.write(buf).await
}
/// Extract the inline buffer data, consuming the wrapper
pub fn into_inline_data(self) -> Option<Vec<u8>> {
match self.writer_type {
WriterType::InlineBuffer => {
let writer = self.bitrot_writer.into_inner();
writer.into_inline_data()
}
WriterType::Other => None,
}
}
}
#[cfg(test)]
mod tests {
use super::BitrotReader;
use super::BitrotWriter;
use rustfs_utils::HashAlgorithm;
use std::io::Cursor;
#[tokio::test]
async fn test_bitrot_read_write_ok() {
let data = b"hello world! this is a test shard.";
let data_size = data.len();
let shard_size = 8;
let buf: Vec<u8> = Vec::new();
let writer = Cursor::new(buf);
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::HighwayHash256);
let mut n = 0;
for chunk in data.chunks(shard_size) {
n += bitrot_writer.write(chunk).await.unwrap();
}
assert_eq!(n, data.len());
// 读
let reader = bitrot_writer.into_inner();
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
let mut out = Vec::new();
let mut n = 0;
while n < data_size {
let mut buf = vec![0u8; shard_size];
let m = bitrot_reader.read(&mut buf).await.unwrap();
assert_eq!(&buf[..m], &data[n..n + m]);
out.extend_from_slice(&buf[..m]);
n += m;
}
assert_eq!(n, data_size);
assert_eq!(data, &out[..]);
}
#[tokio::test]
async fn test_bitrot_read_hash_mismatch() {
let data = b"test data for bitrot";
let data_size = data.len();
let shard_size = 8;
let buf: Vec<u8> = Vec::new();
let writer = Cursor::new(buf);
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::HighwayHash256);
for chunk in data.chunks(shard_size) {
let _ = bitrot_writer.write(chunk).await.unwrap();
}
let mut written = bitrot_writer.into_inner().into_inner();
// change the last byte to make hash mismatch
let pos = written.len() - 1;
written[pos] ^= 0xFF;
let reader = Cursor::new(written);
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
let count = data_size.div_ceil(shard_size);
let mut idx = 0;
let mut n = 0;
while n < data_size {
let mut buf = vec![0u8; shard_size];
let res = bitrot_reader.read(&mut buf).await;
if idx == count - 1 {
// 最后一个块,应该返回错误
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
break;
}
let m = res.unwrap();
assert_eq!(&buf[..m], &data[n..n + m]);
n += m;
idx += 1;
}
}
#[tokio::test]
async fn test_bitrot_read_write_none_hash() {
let data = b"bitrot none hash test data!";
let data_size = data.len();
let shard_size = 8;
let buf: Vec<u8> = Vec::new();
let writer = Cursor::new(buf);
let mut bitrot_writer = BitrotWriter::new(writer, shard_size, HashAlgorithm::None);
let mut n = 0;
for chunk in data.chunks(shard_size) {
n += bitrot_writer.write(chunk).await.unwrap();
}
assert_eq!(n, data.len());
let reader = bitrot_writer.into_inner();
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::None);
let mut out = Vec::new();
let mut n = 0;
while n < data_size {
let mut buf = vec![0u8; shard_size];
let m = bitrot_reader.read(&mut buf).await.unwrap();
assert_eq!(&buf[..m], &data[n..n + m]);
out.extend_from_slice(&buf[..m]);
n += m;
}
assert_eq!(n, data_size);
assert_eq!(data, &out[..]);
}
}

View File

@@ -1,18 +1,20 @@
use super::BitrotReader;
use super::Erasure;
use crate::disk::error::Error;
use crate::disk::error_reduce::reduce_errs;
use futures::future::join_all;
use pin_project_lite::pin_project;
use rustfs_rio::BitrotReader;
use std::io;
use std::io::ErrorKind;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tracing::error;
pin_project! {
pub(crate) struct ParallelReader {
pub(crate) struct ParallelReader<R> {
#[pin]
readers: Vec<Option<BitrotReader>>,
readers: Vec<Option<BitrotReader<R>>>,
offset: usize,
shard_size: usize,
shard_file_size: usize,
@@ -21,9 +23,12 @@ pub(crate) struct ParallelReader {
}
}
impl ParallelReader {
impl<R> ParallelReader<R>
where
R: AsyncRead + Unpin + Send + Sync,
{
// readers传入前应处理disk错误确保每个reader达到可用数量的BitrotReader
pub fn new(readers: Vec<Option<BitrotReader>>, e: Erasure, offset: usize, total_length: usize) -> Self {
pub fn new(readers: Vec<Option<BitrotReader<R>>>, e: Erasure, offset: usize, total_length: usize) -> Self {
let shard_size = e.shard_size();
let shard_file_size = e.shard_file_size(total_length);
@@ -42,7 +47,10 @@ impl ParallelReader {
}
}
impl ParallelReader {
impl<R> ParallelReader<R>
where
R: AsyncRead + Unpin + Send + Sync,
{
pub async fn read(&mut self) -> (Vec<Option<Vec<u8>>>, Vec<Option<Error>>) {
// if self.readers.len() != self.total_shards {
// return Err(io::Error::new(ErrorKind::InvalidInput, "Invalid number of readers"));
@@ -175,16 +183,17 @@ where
}
impl Erasure {
pub async fn decode<W>(
pub async fn decode<W, R>(
&self,
writer: &mut W,
readers: Vec<Option<BitrotReader>>,
readers: Vec<Option<BitrotReader<R>>>,
offset: usize,
length: usize,
total_length: usize,
) -> (usize, Option<std::io::Error>)
where
W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
W: AsyncWrite + Send + Sync + Unpin,
R: AsyncRead + Unpin + Send + Sync,
{
if readers.len() != self.data_shards + self.parity_shards {
return (0, Some(io::Error::new(ErrorKind::InvalidInput, "Invalid number of readers")));

View File

@@ -1,24 +1,22 @@
use bytes::Bytes;
use rustfs_rio::BitrotWriter;
use rustfs_rio::Reader;
// use std::io::Cursor;
// use std::mem;
use super::BitrotWriterWrapper;
use super::Erasure;
use crate::disk::error::Error;
use crate::disk::error_reduce::count_errs;
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_write_quorum_errs};
use bytes::Bytes;
use std::sync::Arc;
use std::vec;
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
pub(crate) struct MultiWriter<'a> {
writers: &'a mut [Option<BitrotWriter>],
writers: &'a mut [Option<BitrotWriterWrapper>],
write_quorum: usize,
errs: Vec<Option<Error>>,
}
impl<'a> MultiWriter<'a> {
pub fn new(writers: &'a mut [Option<BitrotWriter>], write_quorum: usize) -> Self {
pub fn new(writers: &'a mut [Option<BitrotWriterWrapper>], write_quorum: usize) -> Self {
let length = writers.len();
MultiWriter {
writers,
@@ -82,11 +80,11 @@ impl Erasure {
pub async fn encode<R>(
self: Arc<Self>,
mut reader: R,
writers: &mut [Option<BitrotWriter>],
writers: &mut [Option<BitrotWriterWrapper>],
quorum: usize,
) -> std::io::Result<(R, usize)>
where
R: Reader + Send + Sync + Unpin + 'static,
R: AsyncRead + Send + Sync + Unpin + 'static,
{
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(8);

View File

@@ -11,16 +11,16 @@
//! - **Compatibility**: Works with any shard size
//! - **Use case**: Default behavior, recommended for most production use cases
//!
//! ### Hybrid Mode (`reed-solomon-simd` feature)
//! - **Performance**: Uses SIMD optimization when possible, falls back to erasure implementation for small shards
//! - **Compatibility**: Works with any shard size through intelligent fallback
//! - **Reliability**: Best of both worlds - SIMD speed for large data, erasure stability for small data
//! ### SIMD Mode (`reed-solomon-simd` feature)
//! - **Performance**: Uses SIMD optimization for high-performance encoding/decoding
//! - **Compatibility**: Works with any shard size through SIMD implementation
//! - **Reliability**: High-performance SIMD implementation for large data processing
//! - **Use case**: Use when maximum performance is needed for large data processing
//!
//! ## Feature Flags
//!
//! - Default: Use pure reed-solomon-erasure implementation (stable and reliable)
//! - `reed-solomon-simd`: Use hybrid mode (SIMD + erasure fallback for optimal performance)
//! - `reed-solomon-simd`: Use SIMD mode for optimal performance
//! - `reed-solomon-erasure`: Explicitly enable pure erasure mode (same as default)
//!
//! ## Example
@@ -38,25 +38,23 @@ use bytes::{Bytes, BytesMut};
use reed_solomon_erasure::galois_8::ReedSolomon as ReedSolomonErasure;
#[cfg(feature = "reed-solomon-simd")]
use reed_solomon_simd;
// use rustfs_rio::Reader;
use smallvec::SmallVec;
use std::io;
use tokio::io::AsyncRead;
use tracing::warn;
use uuid::Uuid;
/// Reed-Solomon encoder variants supporting different implementations.
#[allow(clippy::large_enum_variant)]
pub enum ReedSolomonEncoder {
/// Hybrid mode: SIMD with erasure fallback (when reed-solomon-simd feature is enabled)
/// SIMD mode: High-performance SIMD implementation (when reed-solomon-simd feature is enabled)
#[cfg(feature = "reed-solomon-simd")]
Hybrid {
SIMD {
data_shards: usize,
parity_shards: usize,
// 使用RwLock确保线程安全实现Send + Sync
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
// erasure fallback for small shards or SIMD failures
fallback_encoder: Box<ReedSolomonErasure>,
},
/// Pure erasure mode: default and when reed-solomon-erasure feature is specified
Erasure(Box<ReedSolomonErasure>),
@@ -66,18 +64,16 @@ impl Clone for ReedSolomonEncoder {
fn clone(&self) -> Self {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::Hybrid {
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
fallback_encoder,
..
} => ReedSolomonEncoder::Hybrid {
} => ReedSolomonEncoder::SIMD {
data_shards: *data_shards,
parity_shards: *parity_shards,
// 为新实例创建空的缓存,不共享缓存
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
fallback_encoder: fallback_encoder.clone(),
},
ReedSolomonEncoder::Erasure(encoder) => ReedSolomonEncoder::Erasure(encoder.clone()),
}
@@ -89,18 +85,12 @@ impl ReedSolomonEncoder {
pub fn new(data_shards: usize, parity_shards: usize) -> io::Result<Self> {
#[cfg(feature = "reed-solomon-simd")]
{
// Hybrid mode: SIMD + erasure fallback when reed-solomon-simd feature is enabled
let fallback_encoder = Box::new(
ReedSolomonErasure::new(data_shards, parity_shards)
.map_err(|e| io::Error::other(format!("Failed to create fallback erasure encoder: {:?}", e)))?,
);
Ok(ReedSolomonEncoder::Hybrid {
// SIMD mode when reed-solomon-simd feature is enabled
Ok(ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
fallback_encoder,
})
}
@@ -117,11 +107,10 @@ impl ReedSolomonEncoder {
pub fn encode(&self, shards: SmallVec<[&mut [u8]; 16]>) -> io::Result<()> {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::Hybrid {
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
encoder_cache,
fallback_encoder,
..
} => {
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
@@ -129,30 +118,14 @@ impl ReedSolomonEncoder {
return Ok(());
}
let shard_len = shards_vec[0].len();
// SIMD 性能最佳的最小 shard 大小 (通常 512-1024 字节)
const SIMD_MIN_SHARD_SIZE: usize = 512;
// 如果 shard 太小,直接使用 fallback encoder
if shard_len < SIMD_MIN_SHARD_SIZE {
let fallback_shards: SmallVec<[&mut [u8]; 16]> = SmallVec::from_vec(shards_vec);
return fallback_encoder
.encode(fallback_shards)
.map_err(|e| io::Error::other(format!("Fallback erasure encode error: {:?}", e)));
}
// 尝试使用 SIMD如果失败则回退到 fallback
// 使用 SIMD 进行编码
let simd_result = self.encode_with_simd(*data_shards, *parity_shards, encoder_cache, &mut shards_vec);
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD encoding failed: {}, using fallback", simd_error);
let fallback_shards: SmallVec<[&mut [u8]; 16]> = SmallVec::from_vec(shards_vec);
fallback_encoder
.encode(fallback_shards)
.map_err(|e| io::Error::other(format!("Fallback erasure encode error: {:?}", e)))
warn!("SIMD encoding failed: {}", simd_error);
Err(simd_error)
}
}
}
@@ -231,39 +204,20 @@ impl ReedSolomonEncoder {
pub fn reconstruct(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::Hybrid {
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
decoder_cache,
fallback_encoder,
..
} => {
// Find a valid shard to determine length
let shard_len = shards
.iter()
.find_map(|s| s.as_ref().map(|v| v.len()))
.ok_or_else(|| io::Error::other("No valid shards found for reconstruction"))?;
// SIMD 性能最佳的最小 shard 大小
const SIMD_MIN_SHARD_SIZE: usize = 512;
// 如果 shard 太小,直接使用 fallback encoder
if shard_len < SIMD_MIN_SHARD_SIZE {
return fallback_encoder
.reconstruct(shards)
.map_err(|e| io::Error::other(format!("Fallback erasure reconstruct error: {:?}", e)));
}
// 尝试使用 SIMD如果失败则回退到 fallback
// 使用 SIMD 进行重构
let simd_result = self.reconstruct_with_simd(*data_shards, *parity_shards, decoder_cache, shards);
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD reconstruction failed: {}, using fallback", simd_error);
fallback_encoder
.reconstruct(shards)
.map_err(|e| io::Error::other(format!("Fallback erasure reconstruct error: {:?}", e)))
warn!("SIMD reconstruction failed: {}", simd_error);
Err(simd_error)
}
}
}
@@ -402,6 +356,10 @@ impl Clone for Erasure {
}
}
pub fn calc_shard_size(block_size: usize, data_shards: usize) -> usize {
(block_size.div_ceil(data_shards) + 1) & !1
}
impl Erasure {
/// Create a new Erasure instance.
///
@@ -439,7 +397,7 @@ impl Erasure {
// let total_size = shard_size * self.total_shard_count();
// 数据切片数量
let per_shard_size = data.len().div_ceil(self.data_shards);
let per_shard_size = calc_shard_size(data.len(), self.data_shards);
// 总需求大小
let need_total_size = per_shard_size * self.total_shard_count();
@@ -507,7 +465,7 @@ impl Erasure {
/// Calculate the size of each shard.
pub fn shard_size(&self) -> usize {
self.block_size.div_ceil(self.data_shards)
calc_shard_size(self.block_size, self.data_shards)
}
/// Calculate the total erasure file size for a given original size.
// Returns the final erasure size from the original size
@@ -518,7 +476,7 @@ impl Erasure {
let num_shards = total_length / self.block_size;
let last_block_size = total_length % self.block_size;
let last_shard_size = last_block_size.div_ceil(self.data_shards);
let last_shard_size = calc_shard_size(last_block_size, self.data_shards);
num_shards * self.shard_size() + last_shard_size
}
@@ -536,22 +494,29 @@ impl Erasure {
till_offset
}
/// Encode all data from a rustfs_rio::Reader in blocks, calling an async callback for each encoded block.
/// This method is async and returns the reader and total bytes read after all blocks are processed.
/// Encode all data from a reader in blocks, calling an async callback for each encoded block.
/// This method is async and returns the total bytes read after all blocks are processed.
///
/// # Arguments
/// * `reader` - A rustfs_rio::Reader to read data from.
/// * `mut on_block` - Async callback: FnMut(Result<Vec<Bytes>, std::io::Error>) -> Future<Output=Result<(), E>> + Send
/// * `reader` - An async reader implementing AsyncRead + Send + Sync + Unpin
/// * `mut on_block` - Async callback that receives encoded blocks and returns a Result
/// * `F` - Callback type: FnMut(Result<Vec<Bytes>, std::io::Error>) -> Future<Output=Result<(), E>> + Send
/// * `Fut` - Future type returned by the callback
/// * `E` - Error type returned by the callback
/// * `R` - Reader type implementing AsyncRead + Send + Sync + Unpin
///
/// # Returns
/// Result<(reader, total_bytes_read), E> after all data has been processed or on callback error.
/// Result<usize, E> containing total bytes read, or error from callback
///
/// # Errors
/// Returns error if reading from reader fails or if callback returns error
pub async fn encode_stream_callback_async<F, Fut, E, R>(
self: std::sync::Arc<Self>,
reader: &mut R,
mut on_block: F,
) -> Result<usize, E>
where
R: rustfs_rio::Reader + Send + Sync + Unpin,
R: AsyncRead + Send + Sync + Unpin,
F: FnMut(std::io::Result<Vec<Bytes>>) -> Fut + Send,
Fut: std::future::Future<Output = Result<(), E>> + Send,
{
@@ -582,6 +547,7 @@ impl Erasure {
#[cfg(test)]
mod tests {
use super::*;
#[test]
@@ -603,9 +569,9 @@ mod tests {
// Case 5: total_length > block_size, aligned
assert_eq!(erasure.shard_file_size(16), 4); // 16/8=2, last=0, 2*2+0=4
assert_eq!(erasure.shard_file_size(1248739), 312185); // 1248739/8=156092, last=3, 3 div_ceil 4=1, 156092*2+1=312185
assert_eq!(erasure.shard_file_size(1248739), 312186); // 1248739/8=156092, last=3, 3 div_ceil 4=1, 156092*2+1=312185
assert_eq!(erasure.shard_file_size(43), 11); // 43/8=5, last=3, 3 div_ceil 4=1, 5*2+1=11
assert_eq!(erasure.shard_file_size(43), 12); // 43/8=5, last=3, 3 div_ceil 4=1, 5*2+1=11
}
#[test]
@@ -617,7 +583,7 @@ mod tests {
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // Hybrid mode - SIMD with fallback
let block_size = 1024; // SIMD mode - SIMD with fallback
let erasure = Erasure::new(data_shards, parity_shards, block_size);
@@ -625,7 +591,7 @@ mod tests {
#[cfg(not(feature = "reed-solomon-simd"))]
let test_data = b"hello world".to_vec(); // Small data for erasure (default)
#[cfg(feature = "reed-solomon-simd")]
let test_data = b"Hybrid mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization.".repeat(20); // ~3KB for hybrid
let test_data = b"SIMD mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization.".repeat(20); // ~3KB for SIMD
let data = &test_data;
let encoded_shards = erasure.encode_data(data).unwrap();
@@ -655,7 +621,7 @@ mod tests {
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 512 * 3; // Hybrid mode - SIMD with fallback
let block_size = 512 * 3; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8192; // Pure erasure mode (default)
@@ -700,7 +666,7 @@ mod tests {
#[test]
fn test_shard_size_and_file_size() {
let erasure = Erasure::new(4, 2, 8);
assert_eq!(erasure.shard_file_size(33), 9);
assert_eq!(erasure.shard_file_size(33), 10);
assert_eq!(erasure.shard_file_size(0), 0);
}
@@ -722,7 +688,7 @@ mod tests {
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // Hybrid mode
let block_size = 1024; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
@@ -732,12 +698,12 @@ mod tests {
let data =
b"Async error test data with sufficient length to meet requirements for proper testing and validation.".repeat(20); // ~2KB
let mut rio_reader = Cursor::new(data);
let mut reader = Cursor::new(data);
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(8);
let erasure_clone = erasure.clone();
let handle = tokio::spawn(async move {
erasure_clone
.encode_stream_callback_async::<_, _, (), _>(&mut rio_reader, move |res| {
.encode_stream_callback_async::<_, _, (), _>(&mut reader, move |res| {
let tx = tx.clone();
async move {
let shards = res.unwrap();
@@ -765,7 +731,7 @@ mod tests {
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // Hybrid mode
let block_size = 1024; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
@@ -779,12 +745,12 @@ mod tests {
// let data = b"callback".to_vec(); // 8 bytes to fit exactly in one 8-byte block
let data_clone = data.clone(); // Clone for later comparison
let mut rio_reader = Cursor::new(data);
let mut reader = Cursor::new(data);
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(8);
let erasure_clone = erasure.clone();
let handle = tokio::spawn(async move {
erasure_clone
.encode_stream_callback_async::<_, _, (), _>(&mut rio_reader, move |res| {
.encode_stream_callback_async::<_, _, (), _>(&mut reader, move |res| {
let tx = tx.clone();
async move {
let shards = res.unwrap();
@@ -816,20 +782,20 @@ mod tests {
assert_eq!(&recovered, &data_clone);
}
// Tests specifically for hybrid mode (SIMD + erasure fallback)
// Tests specifically for SIMD mode
#[cfg(feature = "reed-solomon-simd")]
mod hybrid_tests {
mod simd_tests {
use super::*;
#[test]
fn test_hybrid_encode_decode_roundtrip() {
fn test_simd_encode_decode_roundtrip() {
let data_shards = 4;
let parity_shards = 2;
let block_size = 1024; // Use larger block size for hybrid mode
let block_size = 1024; // Use larger block size for SIMD mode
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// Use data that will create shards >= 512 bytes for SIMD optimization
let test_data = b"Hybrid test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization and validation.";
let test_data = b"SIMD mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization and validation.";
let data = test_data.repeat(25); // Create much larger data: ~5KB total, ~1.25KB per shard
let encoded_shards = erasure.encode_data(&data).unwrap();
@@ -854,10 +820,10 @@ mod tests {
}
#[test]
fn test_hybrid_all_zero_data() {
fn test_simd_all_zero_data() {
let data_shards = 4;
let parity_shards = 2;
let block_size = 1024; // Use larger block size for hybrid mode
let block_size = 1024; // Use larger block size for SIMD mode
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// Create all-zero data that ensures adequate shard size for SIMD optimization
@@ -997,39 +963,35 @@ mod tests {
}
#[test]
fn test_simd_smart_fallback() {
fn test_simd_small_data_handling() {
let data_shards = 4;
let parity_shards = 2;
let block_size = 32; // 很小的block_size会导致小shard
let block_size = 32; // Small block size for testing edge cases
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// 使用小数据每个shard只有8字节远小于512字节SIMD最小要求
let small_data = b"tiny!123".to_vec(); // 8字节数据
// Use small data to test SIMD handling of small shards
let small_data = b"tiny!123".to_vec(); // 8 bytes data
// 应该能够成功编码通过fallback
// Test encoding with small data
let result = erasure.encode_data(&small_data);
match result {
Ok(shards) => {
println!(
"✅ Smart fallback worked: encoded {} bytes into {} shards",
small_data.len(),
shards.len()
);
println!("✅ SIMD encoding succeeded: {} bytes into {} shards", small_data.len(), shards.len());
assert_eq!(shards.len(), data_shards + parity_shards);
// 测试解码
// Test decoding
let mut shards_opt: Vec<Option<Vec<u8>>> = shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失一些shard来测试恢复
shards_opt[1] = None; // 丢失一个数据shard
shards_opt[4] = None; // 丢失一个奇偶shard
// Lose some shards to test recovery
shards_opt[1] = None; // Lose one data shard
shards_opt[4] = None; // Lose one parity shard
let decode_result = erasure.decode_data(&mut shards_opt);
match decode_result {
Ok(()) => {
println!("✅ Smart fallback decode worked");
println!("✅ SIMD decode worked");
// 验证恢复的数据
// Verify recovered data
let mut recovered = Vec::new();
for shard in shards_opt.iter().take(data_shards) {
recovered.extend_from_slice(shard.as_ref().unwrap());
@@ -1038,17 +1000,17 @@ mod tests {
println!("recovered: {:?}", recovered);
println!("small_data: {:?}", small_data);
assert_eq!(&recovered, &small_data);
println!("✅ Data recovery successful with smart fallback");
println!("✅ Data recovery successful with SIMD");
}
Err(e) => {
println!("❌ Smart fallback decode failed: {}", e);
// 对于很小的数据如果decode失败也是可以接受的
println!("❌ SIMD decode failed: {}", e);
// For very small data, decode failure might be acceptable
}
}
}
Err(e) => {
println!("❌ Smart fallback encode failed: {}", e);
// 如果连fallback都失败了说明数据太小或配置有问题
println!("❌ SIMD encode failed: {}", e);
// For very small data or configuration issues, encoding might fail
}
}
}
@@ -1143,14 +1105,14 @@ mod tests {
let test_data = b"SIMD stream processing test with sufficient data length for multiple blocks and proper SIMD optimization verification!";
let data = test_data.repeat(5); // Create owned Vec<u8>
let data_clone = data.clone(); // Clone for later comparison
let mut rio_reader = Cursor::new(data);
let mut reader = Cursor::new(data);
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(16);
let erasure_clone = erasure.clone();
let handle = tokio::spawn(async move {
erasure_clone
.encode_stream_callback_async::<_, _, (), _>(&mut rio_reader, move |res| {
.encode_stream_callback_async::<_, _, (), _>(&mut reader, move |res| {
let tx = tx.clone();
async move {
let shards = res.unwrap();

View File

@@ -1,19 +1,23 @@
use super::BitrotReader;
use super::BitrotWriterWrapper;
use super::decode::ParallelReader;
use crate::disk::error::{Error, Result};
use crate::erasure_coding::encode::MultiWriter;
use bytes::Bytes;
use rustfs_rio::BitrotReader;
use rustfs_rio::BitrotWriter;
use tokio::io::AsyncRead;
use tracing::info;
impl super::Erasure {
pub async fn heal(
pub async fn heal<R>(
&self,
writers: &mut [Option<BitrotWriter>],
readers: Vec<Option<BitrotReader>>,
writers: &mut [Option<BitrotWriterWrapper>],
readers: Vec<Option<BitrotReader<R>>>,
total_length: usize,
_prefer: &[bool],
) -> Result<()> {
) -> Result<()>
where
R: AsyncRead + Unpin + Send + Sync,
{
info!(
"Erasure heal, writers len: {}, readers len: {}, total_length: {}",
writers.len(),

View File

@@ -3,4 +3,7 @@ pub mod encode;
pub mod erasure;
pub mod heal;
mod bitrot;
pub use bitrot::*;
pub use erasure::{Erasure, ReedSolomonEncoder};

View File

@@ -1,5 +1,5 @@
pub mod admin_server_info;
// pub mod bitrot;
pub mod bitrot;
pub mod bucket;
pub mod cache_value;
mod chunk_stream;

View File

@@ -1,9 +1,11 @@
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs};
use crate::disk::{
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
conv_part_err_to_int, has_part_err,
};
use crate::erasure_coding;
use crate::erasure_coding::bitrot_verify;
use crate::error::{Error, Result};
use crate::global::GLOBAL_MRFState;
use crate::heal::data_usage_cache::DataUsageCache;
@@ -64,7 +66,7 @@ use rustfs_filemeta::{
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
RawFileInfo, file_info_from_raw, merge_file_meta_versions,
};
use rustfs_rio::{BitrotReader, BitrotWriter, EtagResolvable, HashReader, Writer, bitrot_verify};
use rustfs_rio::{EtagResolvable, HashReader};
use rustfs_utils::HashAlgorithm;
use sha2::{Digest, Sha256};
use std::hash::Hash;
@@ -1865,51 +1867,31 @@ impl SetDisks {
let mut readers = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
for (idx, disk_op) in disks.iter().enumerate() {
if let Some(inline_data) = files[idx].data.clone() {
let rd = Cursor::new(inline_data);
let reader = BitrotReader::new(Box::new(rd), erasure.shard_size(), HashAlgorithm::HighwayHash256);
readers.push(Some(reader));
errors.push(None);
} else if let Some(disk) = disk_op {
// Calculate ceiling division of till_offset by shard_size
let till_offset =
till_offset.div_ceil(erasure.shard_size()) * HashAlgorithm::HighwayHash256.size() + till_offset;
let rd = disk
.read_file_stream(
bucket,
&format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number),
part_offset,
till_offset,
)
.await?;
let reader = BitrotReader::new(
Box::new(rustfs_rio::WarpReader::new(rd)),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
);
readers.push(Some(reader));
errors.push(None);
} else {
errors.push(Some(DiskError::DiskNotFound));
readers.push(None);
match create_bitrot_reader(
files[idx].data.as_deref(),
disk_op.as_ref(),
bucket,
&format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or_default(), part_number),
part_offset,
till_offset,
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await
{
Ok(Some(reader)) => {
readers.push(Some(reader));
errors.push(None);
}
Ok(None) => {
readers.push(None);
errors.push(Some(DiskError::DiskNotFound));
}
Err(e) => {
readers.push(None);
errors.push(Some(e));
}
}
// if let Some(disk) = disk_op {
// let checksum_info = files[idx].erasure.get_checksum_info(part_number);
// let reader = new_bitrot_filereader(
// disk.clone(),
// files[idx].data.clone(),
// bucket.to_owned(),
// format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number),
// till_offset,
// checksum_info.algorithm,
// erasure.shard_size(erasure.block_size),
// );
// readers.push(Some(reader));
// } else {
// readers.push(None)
// }
}
let nil_count = errors.iter().filter(|&e| e.is_none()).count();
@@ -2478,59 +2460,31 @@ impl SetDisks {
let mut prefer = vec![false; latest_disks.len()];
for (index, disk) in latest_disks.iter().enumerate() {
if let (Some(disk), Some(metadata)) = (disk, &copy_parts_metadata[index]) {
// let filereader = {
// if let Some(ref data) = metadata.data {
// Box::new(BufferReader::new(data.clone()))
// } else {
// let disk = disk.clone();
// let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number);
// disk.read_file(bucket, &part_path).await?
// }
// };
// let reader = new_bitrot_filereader(
// disk.clone(),
// metadata.data.clone(),
// bucket.to_owned(),
// format!("{}/{}/part.{}", object, src_data_dir, part.number),
// till_offset,
// checksum_algo.clone(),
// erasure.shard_size(erasure.block_size),
// );
if let Some(ref data) = metadata.data {
let rd = Cursor::new(data.clone());
let reader =
BitrotReader::new(Box::new(rd), erasure.shard_size(), checksum_algo.clone());
readers.push(Some(reader));
// errors.push(None);
} else {
let length =
till_offset.div_ceil(erasure.shard_size()) * checksum_algo.size() + till_offset;
let rd = match disk
.read_file_stream(
bucket,
&format!("{}/{}/part.{}", object, src_data_dir, part.number),
0,
length,
)
.await
{
Ok(rd) => rd,
Err(e) => {
// errors.push(Some(e.into()));
error!("heal_object read_file err: {:?}", e);
writers.push(None);
continue;
}
};
let reader = BitrotReader::new(
Box::new(rustfs_rio::WarpReader::new(rd)),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
);
readers.push(Some(reader));
// errors.push(None);
match create_bitrot_reader(
metadata.data.as_deref(),
Some(disk),
bucket,
&format!("{}/{}/part.{}", object, src_data_dir, part.number),
0,
till_offset,
erasure.shard_size(),
checksum_algo.clone(),
)
.await
{
Ok(Some(reader)) => {
readers.push(Some(reader));
}
Ok(None) => {
error!("heal_object disk not available");
readers.push(None);
continue;
}
Err(e) => {
error!("heal_object read_file err: {:?}", e);
readers.push(None);
continue;
}
}
prefer[index] = disk.host_name().is_empty();
@@ -2549,55 +2503,67 @@ impl SetDisks {
};
for disk in out_dated_disks.iter() {
if let Some(disk) = disk {
// let filewriter = {
// if is_inline_buffer {
// Box::new(Cursor::new(Vec::new()))
// } else {
// let disk = disk.clone();
// let part_path = format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number);
// disk.create_file("", RUSTFS_META_TMP_BUCKET, &part_path, 0).await?
// }
// };
let writer = create_bitrot_writer(
is_inline_buffer,
disk.as_ref(),
RUSTFS_META_TMP_BUCKET,
&format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number),
erasure.shard_file_size(part.size),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await?;
writers.push(Some(writer));
if is_inline_buffer {
let writer = BitrotWriter::new(
Writer::from_cursor(Cursor::new(Vec::new())),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
);
writers.push(Some(writer));
} else {
let f = disk
.create_file(
"",
RUSTFS_META_TMP_BUCKET,
&format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number),
0,
)
.await?;
let writer = BitrotWriter::new(
Writer::from_tokio_writer(f),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
);
writers.push(Some(writer));
}
// if let Some(disk) = disk {
// // let filewriter = {
// // if is_inline_buffer {
// // Box::new(Cursor::new(Vec::new()))
// // } else {
// // let disk = disk.clone();
// // let part_path = format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number);
// // disk.create_file("", RUSTFS_META_TMP_BUCKET, &part_path, 0).await?
// // }
// // };
// let writer = new_bitrot_filewriter(
// disk.clone(),
// RUSTFS_META_TMP_BUCKET,
// format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number).as_str(),
// is_inline_buffer,
// DEFAULT_BITROT_ALGO,
// erasure.shard_size(erasure.block_size),
// )
// .await?;
// if is_inline_buffer {
// let writer = BitrotWriter::new(
// Writer::from_cursor(Cursor::new(Vec::new())),
// erasure.shard_size(),
// HashAlgorithm::HighwayHash256,
// );
// writers.push(Some(writer));
// } else {
// let f = disk
// .create_file(
// "",
// RUSTFS_META_TMP_BUCKET,
// &format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number),
// 0,
// )
// .await?;
// let writer = BitrotWriter::new(
// Writer::from_tokio_writer(f),
// erasure.shard_size(),
// HashAlgorithm::HighwayHash256,
// );
// writers.push(Some(writer));
// }
// writers.push(Some(writer));
} else {
writers.push(None);
}
// // let writer = new_bitrot_filewriter(
// // disk.clone(),
// // RUSTFS_META_TMP_BUCKET,
// // format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number).as_str(),
// // is_inline_buffer,
// // DEFAULT_BITROT_ALGO,
// // erasure.shard_size(erasure.block_size),
// // )
// // .await?;
// // writers.push(Some(writer));
// } else {
// writers.push(None);
// }
}
// Heal each part. erasure.Heal() will write the healed
@@ -2630,8 +2596,7 @@ impl SetDisks {
// if let Some(w) = writer.as_any().downcast_ref::<BitrotFileWriter>() {
// parts_metadata[index].data = Some(w.inline_data().to_vec());
// }
parts_metadata[index].data =
Some(writer.into_inner().into_cursor_inner().unwrap_or_default());
parts_metadata[index].data = Some(writer.into_inline_data().unwrap_or_default());
}
parts_metadata[index].set_inline_data();
} else {
@@ -3882,27 +3847,38 @@ impl ObjectIO for SetDisks {
let mut errors = Vec::with_capacity(shuffle_disks.len());
for disk_op in shuffle_disks.iter() {
if let Some(disk) = disk_op {
let writer = if is_inline_buffer {
BitrotWriter::new(
Writer::from_cursor(Cursor::new(Vec::new())),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
} else {
let f = match disk
.create_file("", RUSTFS_META_TMP_BUCKET, &tmp_object, erasure.shard_file_size(data.content_length))
.await
{
Ok(f) => f,
Err(e) => {
errors.push(Some(e));
writers.push(None);
continue;
}
};
let writer = create_bitrot_writer(
is_inline_buffer,
Some(disk),
RUSTFS_META_TMP_BUCKET,
&tmp_object,
erasure.shard_file_size(data.content_length),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await?;
BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256)
};
// let writer = if is_inline_buffer {
// BitrotWriter::new(
// Writer::from_cursor(Cursor::new(Vec::new())),
// erasure.shard_size(),
// HashAlgorithm::HighwayHash256,
// )
// } else {
// let f = match disk
// .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_object, erasure.shard_file_size(data.content_length))
// .await
// {
// Ok(f) => f,
// Err(e) => {
// errors.push(Some(e));
// writers.push(None);
// continue;
// }
// };
// BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256)
// };
writers.push(Some(writer));
errors.push(None);
@@ -3952,7 +3928,7 @@ impl ObjectIO for SetDisks {
for (i, fi) in parts_metadatas.iter_mut().enumerate() {
if is_inline_buffer {
if let Some(writer) = writers[i].take() {
fi.data = Some(writer.into_inner().into_cursor_inner().unwrap_or_default());
fi.data = Some(writer.into_inline_data().unwrap_or_default());
}
}
@@ -4553,21 +4529,32 @@ impl StorageAPI for SetDisks {
let mut errors = Vec::with_capacity(shuffle_disks.len());
for disk_op in shuffle_disks.iter() {
if let Some(disk) = disk_op {
let writer = {
let f = match disk
.create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, erasure.shard_file_size(data.content_length))
.await
{
Ok(f) => f,
Err(e) => {
errors.push(Some(e));
writers.push(None);
continue;
}
};
let writer = create_bitrot_writer(
false,
Some(disk),
RUSTFS_META_TMP_BUCKET,
&tmp_part_path,
erasure.shard_file_size(data.content_length),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await?;
BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256)
};
// let writer = {
// let f = match disk
// .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, erasure.shard_file_size(data.content_length))
// .await
// {
// Ok(f) => f,
// Err(e) => {
// errors.push(Some(e));
// writers.push(None);
// continue;
// }
// };
// BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256)
// };
writers.push(Some(writer));
errors.push(None);
@@ -6079,7 +6066,6 @@ mod tests {
// Test object directory dangling detection
let errs = vec![Some(DiskError::FileNotFound), Some(DiskError::FileNotFound), None];
assert!(is_object_dir_dang_ling(&errs));
let errs2 = vec![None, None, None];
assert!(!is_object_dir_dang_ling(&errs2));