fix bitrot

This commit is contained in:
weisd
2025-06-18 18:22:07 +08:00
parent 85421657ad
commit 5b1b527851
12 changed files with 177 additions and 129 deletions

View File

@@ -46,7 +46,7 @@ pin_project! {
impl HttpReader {
pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option<Vec<u8>>) -> io::Result<Self> {
http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}");
// http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}");
Self::with_capacity(url, method, headers, body, 0).await
}
/// Create a new HttpReader from a URL. The request is performed immediately.
@@ -57,10 +57,10 @@ impl HttpReader {
body: Option<Vec<u8>>,
_read_buf_size: usize,
) -> io::Result<Self> {
http_log!(
"[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}",
_read_buf_size
);
// http_log!(
// "[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}",
// _read_buf_size
// );
// First, check if the connection is available (HEAD)
let client = get_http_client();
let head_resp = client.head(&url).headers(headers.clone()).send().await;
@@ -119,12 +119,12 @@ impl HttpReader {
impl AsyncRead for HttpReader {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
http_log!(
"[HttpReader::poll_read] url: {}, method: {:?}, buf.remaining: {}",
self.url,
self.method,
buf.remaining()
);
// http_log!(
// "[HttpReader::poll_read] url: {}, method: {:?}, buf.remaining: {}",
// self.url,
// self.method,
// buf.remaining()
// );
// Read from the inner stream
Pin::new(&mut self.inner).poll_read(cx, buf)
}
@@ -157,20 +157,20 @@ impl Stream for ReceiverStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = Pin::new(&mut self.receiver).poll_recv(cx);
match &poll {
Poll::Ready(Some(Some(bytes))) => {
http_log!("[ReceiverStream] poll_next: got {} bytes", bytes.len());
}
Poll::Ready(Some(None)) => {
http_log!("[ReceiverStream] poll_next: sender shutdown");
}
Poll::Ready(None) => {
http_log!("[ReceiverStream] poll_next: channel closed");
}
Poll::Pending => {
// http_log!("[ReceiverStream] poll_next: pending");
}
}
// match &poll {
// Poll::Ready(Some(Some(bytes))) => {
// // http_log!("[ReceiverStream] poll_next: got {} bytes", bytes.len());
// }
// Poll::Ready(Some(None)) => {
// // http_log!("[ReceiverStream] poll_next: sender shutdown");
// }
// Poll::Ready(None) => {
// // http_log!("[ReceiverStream] poll_next: channel closed");
// }
// Poll::Pending => {
// // http_log!("[ReceiverStream] poll_next: pending");
// }
// }
match poll {
Poll::Ready(Some(Some(bytes))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(Some(None)) => Poll::Ready(None), // Sender shutdown
@@ -196,7 +196,7 @@ pin_project! {
impl HttpWriter {
/// Create a new HttpWriter for the given URL. The HTTP request is performed in the background.
pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result<Self> {
http_log!("[HttpWriter::new] url: {url}, method: {method:?}, headers: {headers:?}");
// http_log!("[HttpWriter::new] url: {url}, method: {method:?}, headers: {headers:?}");
let url_clone = url.clone();
let method_clone = method.clone();
let headers_clone = headers.clone();
@@ -206,13 +206,13 @@ impl HttpWriter {
let resp = client.put(&url).headers(headers.clone()).body(Vec::new()).send().await;
match resp {
Ok(resp) => {
http_log!("[HttpWriter::new] empty PUT status: {}", resp.status());
// http_log!("[HttpWriter::new] empty PUT status: {}", resp.status());
if !resp.status().is_success() {
return Err(Error::other(format!("Empty PUT failed: status {}", resp.status())));
}
}
Err(e) => {
http_log!("[HttpWriter::new] empty PUT error: {e}");
// http_log!("[HttpWriter::new] empty PUT error: {e}");
return Err(Error::other(format!("Empty PUT failed: {e}")));
}
}
@@ -223,9 +223,9 @@ impl HttpWriter {
let handle = tokio::spawn(async move {
let stream = ReceiverStream { receiver };
let body = reqwest::Body::wrap_stream(stream);
http_log!(
"[HttpWriter::spawn] sending HTTP request: url={url_clone}, method={method_clone:?}, headers={headers_clone:?}"
);
// http_log!(
// "[HttpWriter::spawn] sending HTTP request: url={url_clone}, method={method_clone:?}, headers={headers_clone:?}"
// );
let client = get_http_client();
let request = client
@@ -238,7 +238,7 @@ impl HttpWriter {
match response {
Ok(resp) => {
http_log!("[HttpWriter::spawn] got response: status={}", resp.status());
// http_log!("[HttpWriter::spawn] got response: status={}", resp.status());
if !resp.status().is_success() {
let _ = err_tx.send(Error::other(format!(
"HttpWriter HTTP request failed with non-200 status {}",
@@ -248,17 +248,17 @@ impl HttpWriter {
}
}
Err(e) => {
http_log!("[HttpWriter::spawn] HTTP request error: {e}");
// http_log!("[HttpWriter::spawn] HTTP request error: {e}");
let _ = err_tx.send(Error::other(format!("HTTP request failed: {}", e)));
return Err(Error::other(format!("HTTP request failed: {}", e)));
}
}
http_log!("[HttpWriter::spawn] HTTP request completed, exiting");
// http_log!("[HttpWriter::spawn] HTTP request completed, exiting");
Ok(())
});
http_log!("[HttpWriter::new] connection established successfully");
// http_log!("[HttpWriter::new] connection established successfully");
Ok(Self {
url,
method,
@@ -285,12 +285,12 @@ impl HttpWriter {
impl AsyncWrite for HttpWriter {
fn poll_write(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
http_log!(
"[HttpWriter::poll_write] url: {}, method: {:?}, buf.len: {}",
self.url,
self.method,
buf.len()
);
// http_log!(
// "[HttpWriter::poll_write] url: {}, method: {:?}, buf.len: {}",
// self.url,
// self.method,
// buf.len()
// );
if let Ok(e) = Pin::new(&mut self.err_rx).try_recv() {
return Poll::Ready(Err(e));
}
@@ -307,12 +307,19 @@ impl AsyncWrite for HttpWriter {
}
fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// let url = self.url.clone();
// let method = self.method.clone();
if !self.finish {
http_log!("[HttpWriter::poll_shutdown] url: {}, method: {:?}", self.url, self.method);
// http_log!("[HttpWriter::poll_shutdown] url: {}, method: {:?}", url, method);
self.sender
.try_send(None)
.map_err(|e| Error::other(format!("HttpWriter shutdown error: {}", e)))?;
http_log!("[HttpWriter::poll_shutdown] sent shutdown signal to HTTP request");
// http_log!(
// "[HttpWriter::poll_shutdown] sent shutdown signal to HTTP request, url: {}, method: {:?}",
// url,
// method
// );
self.finish = true;
}
@@ -320,13 +327,18 @@ impl AsyncWrite for HttpWriter {
use futures::FutureExt;
match Pin::new(&mut self.get_mut().handle).poll_unpin(_cx) {
Poll::Ready(Ok(_)) => {
http_log!("[HttpWriter::poll_shutdown] HTTP request finished successfully");
// http_log!(
// "[HttpWriter::poll_shutdown] HTTP request finished successfully, url: {}, method: {:?}",
// url,
// method
// );
}
Poll::Ready(Err(e)) => {
http_log!("[HttpWriter::poll_shutdown] HTTP request failed: {e}");
// http_log!("[HttpWriter::poll_shutdown] HTTP request failed: {e}, url: {}, method: {:?}", url, method);
return Poll::Ready(Err(Error::other(format!("HTTP request failed: {}", e))));
}
Poll::Pending => {
// http_log!("[HttpWriter::poll_shutdown] HTTP request pending, url: {}, method: {:?}", url, method);
return Poll::Pending;
}
}

View File

@@ -28,7 +28,7 @@ pub async fn create_bitrot_reader(
checksum_algo: HashAlgorithm,
) -> disk::error::Result<Option<BitrotReader<Box<dyn AsyncRead + Send + Sync + Unpin>>>> {
// Calculate the total length to read, including the checksum overhead
let length = offset.div_ceil(shard_size) * checksum_algo.size() + length;
let length = length.div_ceil(shard_size) * checksum_algo.size() + length;
if let Some(data) = inline_data {
// Use inline data

View File

@@ -1399,8 +1399,6 @@ impl DiskAPI for LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, _file_size: i64) -> Result<FileWriter> {
// warn!("disk create_file: origvolume: {}, volume: {}, path: {}", origvolume, volume, path);
if !origvolume.is_empty() {
let origvolume_dir = self.get_bucket_path(origvolume)?;
if !skip_access_checks(origvolume) {
@@ -1431,8 +1429,6 @@ impl DiskAPI for LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
// async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<File> {
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
warn!("disk append_file: volume: {}, path: {}", volume, path);
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
access(&volume_dir)
@@ -1497,7 +1493,9 @@ impl DiskAPI for LocalDisk {
return Err(DiskError::FileCorrupt);
}
f.seek(SeekFrom::Start(offset as u64)).await?;
if offset > 0 {
f.seek(SeekFrom::Start(offset as u64)).await?;
}
Ok(Box::new(f))
}

View File

@@ -608,7 +608,14 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<FileReader> {
info!("read_file_stream {}/{}/{}", self.endpoint.to_string(), volume, path);
// warn!(
// "disk remote read_file_stream {}/{}/{} offset={} length={}",
// self.endpoint.to_string(),
// volume,
// path,
// offset,
// length
// );
let url = format!(
"{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
self.endpoint.grid_host(),
@@ -641,7 +648,13 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, file_size: i64) -> Result<FileWriter> {
info!("create_file {}/{}/{}", self.endpoint.to_string(), volume, path);
// warn!(
// "disk remote create_file {}/{}/{} file_size={}",
// self.endpoint.to_string(),
// volume,
// path,
// file_size
// );
let url = format!(
"{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}",

View File

@@ -1,7 +1,9 @@
use bytes::Bytes;
use pin_project_lite::pin_project;
use rustfs_utils::{HashAlgorithm, read_full, write_all};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use rustfs_utils::HashAlgorithm;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::error;
use uuid::Uuid;
pin_project! {
/// BitrotReader reads (hash+data) blocks from an async reader and verifies hash integrity.
@@ -12,10 +14,11 @@ pin_project! {
shard_size: usize,
buf: Vec<u8>,
hash_buf: Vec<u8>,
hash_read: usize,
data_buf: Vec<u8>,
data_read: usize,
hash_checked: bool,
// hash_read: usize,
// data_buf: Vec<u8>,
// data_read: usize,
// hash_checked: bool,
id: Uuid,
}
}
@@ -32,10 +35,11 @@ where
shard_size,
buf: Vec::new(),
hash_buf: vec![0u8; hash_size],
hash_read: 0,
data_buf: Vec::new(),
data_read: 0,
hash_checked: false,
// hash_read: 0,
// data_buf: Vec::new(),
// data_read: 0,
// hash_checked: false,
id: Uuid::new_v4(),
}
}
@@ -51,30 +55,31 @@ where
let hash_size = self.hash_algo.size();
// Read hash
let mut hash_buf = vec![0u8; hash_size];
if hash_size > 0 {
self.inner.read_exact(&mut hash_buf).await?;
self.inner.read_exact(&mut self.hash_buf).await.map_err(|e| {
error!("bitrot reader read hash error: {}", e);
e
})?;
}
let data_len = read_full(&mut self.inner, out).await?;
// // Read data
// let mut data_len = 0;
// while data_len < out.len() {
// let n = self.inner.read(&mut out[data_len..]).await?;
// if n == 0 {
// break;
// }
// data_len += n;
// // Only read up to one shard_size block
// if data_len >= self.shard_size {
// break;
// }
// }
// Read data
let mut data_len = 0;
while data_len < out.len() {
let n = self.inner.read(&mut out[data_len..]).await.map_err(|e| {
error!("bitrot reader read data error: {}", e);
e
})?;
if n == 0 {
break;
}
data_len += n;
}
if hash_size > 0 {
let actual_hash = self.hash_algo.hash_encode(&out[..data_len]);
if actual_hash.as_ref() != hash_buf.as_slice() {
if actual_hash.as_ref() != self.hash_buf.as_slice() {
error!("bitrot reader hash mismatch, id={} data_len={}, out_len={}", self.id, data_len, out.len());
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitrot hash mismatch"));
}
}
@@ -145,22 +150,20 @@ where
self.buf.extend_from_slice(buf);
// Write hash+data in one call
let mut n = write_all(&mut self.inner, &self.buf).await?;
self.inner.write_all(&self.buf).await?;
if n < hash_algo.size() {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"short write: not enough bytes written",
));
}
self.inner.flush().await?;
n -= hash_algo.size();
let n = self.buf.len();
self.buf.clear();
Ok(n)
}
pub async fn shutdown(&mut self) -> std::io::Result<()> {
self.inner.shutdown().await
}
}
pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: HashAlgorithm) -> usize {
@@ -330,6 +333,10 @@ impl BitrotWriterWrapper {
self.bitrot_writer.write(buf).await
}
pub async fn shutdown(&mut self) -> std::io::Result<()> {
self.bitrot_writer.shutdown().await
}
/// Extract the inline buffer data, consuming the wrapper
pub fn into_inline_data(self) -> Option<Vec<u8>> {
match self.writer_type {

View File

@@ -67,36 +67,34 @@ where
}
// 使用并发读取所有分片
let mut read_futs = Vec::with_capacity(self.readers.len());
let read_futs: Vec<_> = self
.readers
.iter_mut()
.enumerate()
.map(|(i, opt_reader)| {
if let Some(reader) = opt_reader.as_mut() {
for (i, opt_reader) in self.readers.iter_mut().enumerate() {
let future = if let Some(reader) = opt_reader.as_mut() {
Box::pin(async move {
let mut buf = vec![0u8; shard_size];
// 需要move i, buf
Some(async move {
match reader.read(&mut buf).await {
Ok(n) => {
buf.truncate(n);
(i, Ok(buf))
}
Err(e) => (i, Err(Error::from(e))),
match reader.read(&mut buf).await {
Ok(n) => {
buf.truncate(n);
(i, Ok(buf))
}
})
} else {
None
}
})
.collect();
Err(e) => (i, Err(Error::from(e))),
}
}) as std::pin::Pin<Box<dyn std::future::Future<Output = (usize, Result<Vec<u8>, Error>)> + Send>>
} else {
// reader是None时返回FileNotFound错误
Box::pin(async move { (i, Err(Error::FileNotFound)) })
as std::pin::Pin<Box<dyn std::future::Future<Output = (usize, Result<Vec<u8>, Error>)> + Send>>
};
read_futs.push(future);
}
// 过滤掉Nonejoin_all
let mut results = join_all(read_futs.into_iter().flatten()).await;
let results = join_all(read_futs).await;
let mut shards: Vec<Option<Vec<u8>>> = vec![None; self.readers.len()];
let mut errs = vec![None; self.readers.len()];
for (i, shard) in results.drain(..) {
for (i, shard) in results.into_iter() {
match shard {
Ok(data) => {
if !data.is_empty() {

View File

@@ -97,6 +97,13 @@ impl<'a> MultiWriter<'a> {
.join(", ")
)))
}
pub async fn _shutdown(&mut self) -> std::io::Result<()> {
for writer in self.writers.iter_mut().flatten() {
writer.shutdown().await?;
}
Ok(())
}
}
impl Erasure {
@@ -147,7 +154,7 @@ impl Erasure {
}
let (reader, total) = task.await??;
// writers.shutdown().await?;
Ok((reader, total))
}
}

View File

@@ -555,6 +555,13 @@ mod tests {
use super::*;
#[test]
fn test_shard_file_size_cases2() {
let erasure = Erasure::new(12, 4, 1024 * 1024);
assert_eq!(erasure.shard_file_size(1572864), 131074);
}
#[test]
fn test_shard_file_size_cases() {
let erasure = Erasure::new(4, 2, 8);
@@ -577,6 +584,8 @@ mod tests {
assert_eq!(erasure.shard_file_size(1248739), 312186); // 1248739/8=156092, last=3, 3 div_ceil 4=1, 156092*2+1=312185
assert_eq!(erasure.shard_file_size(43), 12); // 43/8=5, last=3, 3 div_ceil 4=1, 5*2+1=11
assert_eq!(erasure.shard_file_size(1572864), 393216); // 43/8=5, last=3, 3 div_ceil 4=1, 5*2+1=11
}
#[test]
@@ -677,9 +686,14 @@ mod tests {
#[test]
fn test_shard_file_offset() {
let erasure = Erasure::new(4, 2, 8);
let offset = erasure.shard_file_offset(0, 16, 32);
let erasure = Erasure::new(8, 8, 1024 * 1024);
let offset = erasure.shard_file_offset(0, 86, 86);
println!("offset={}", offset);
assert!(offset > 0);
let total_length = erasure.shard_file_size(86);
println!("total_length={}", total_length);
assert!(total_length > 0);
}
#[tokio::test]

View File

@@ -243,7 +243,7 @@ impl ECStore {
return Err(err);
}
error!("rebalanceMeta: not found, rebalance not started");
warn!("rebalanceMeta: not found, rebalance not started");
}
}
@@ -501,7 +501,7 @@ impl ECStore {
if let Some(meta) = rebalance_meta.as_mut() {
meta.cancel = Some(tx)
} else {
error!("start_rebalance: rebalance_meta is None exit");
warn!("start_rebalance: rebalance_meta is None exit");
return;
}

View File

@@ -1859,8 +1859,6 @@ impl SetDisks {
let (last_part_index, _) = fi.to_part_offset(end_offset)?;
// let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let erasure = erasure_coding::Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let mut total_readed = 0;

View File

@@ -91,6 +91,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
@@ -1787,7 +1788,7 @@ impl S3 for FS {
let object_lock_configuration = match metadata_sys::get_object_lock_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
warn!("get_object_lock_config err {:?}", err);
debug!("get_object_lock_config err {:?}", err);
None
}
};

View File

@@ -9,14 +9,14 @@ UNZIP_TARGET="./"
SERVER_LIST=(
"root@172.23.215.2" # node1
"root@172.23.215.4" # node2
"root@172.23.215.7" # node3
"root@172.23.215.3" # node4
"root@172.23.215.8" # node5
"root@172.23.215.5" # node6
"root@172.23.215.9" # node7
"root@172.23.215.6" # node8
"root@node1" # node1
"root@node2" # node2
"root@node3" # node3
"root@node4" # node4
# "root@node5" # node5
# "root@node6" # node6
# "root@node7" # node7
# "root@node8" # node8
)
REMOTE_TMP="~/rustfs"