Refactor: Introduce content checksums and improve multipart/object metadata handling (#671)

* feat:  adapt to s3s typed etag support

* refactor: move replication struct to rustfs_filemeta, fix filemeta transition bug

* add head_object checksum, filter object metadata output

* fix multipart checksum

* fix multipart checksum

* add content md5,sha256 check

* fix test

* fix cargo

---------

Co-authored-by: overtrue <anzhengchao@gmail.com>
This commit is contained in:
weisd
2025-10-20 23:46:13 +08:00
committed by GitHub
parent 46797dc815
commit cd1e244c68
56 changed files with 3331 additions and 810 deletions

View File

@@ -50,7 +50,7 @@
//! let diskable_md5 = false;
//!
//! // Method 1: Simple creation (recommended for most cases)
//! let hash_reader = HashReader::new(reader, size, actual_size, etag.clone(), diskable_md5).unwrap();
//! let hash_reader = HashReader::new(reader, size, actual_size, etag.clone(), None, diskable_md5).unwrap();
//!
//! // Method 2: With manual wrapping to recreate original logic
//! let reader2 = BufReader::new(Cursor::new(&data[..]));
@@ -71,7 +71,7 @@
//! // No wrapping needed
//! reader2
//! };
//! let hash_reader2 = HashReader::new(wrapped_reader, size, actual_size, etag, diskable_md5).unwrap();
//! let hash_reader2 = HashReader::new(wrapped_reader, size, actual_size, etag.clone(), None, diskable_md5).unwrap();
//! # });
//! ```
//!
@@ -88,28 +88,43 @@
//! # tokio_test::block_on(async {
//! let data = b"test";
//! let reader = BufReader::new(Cursor::new(&data[..]));
//! let hash_reader = HashReader::new(Box::new(WarpReader::new(reader)), 4, 4, None, false).unwrap();
//! let hash_reader = HashReader::new(Box::new(WarpReader::new(reader)), 4, 4, None, None,false).unwrap();
//!
//! // Check if a type is a HashReader
//! assert!(hash_reader.is_hash_reader());
//!
//! // Use new for compatibility (though it's simpler to use new() directly)
//! let reader2 = BufReader::new(Cursor::new(&data[..]));
//! let result = HashReader::new(Box::new(WarpReader::new(reader2)), 4, 4, None, false);
//! let result = HashReader::new(Box::new(WarpReader::new(reader2)), 4, 4, None, None, false);
//! assert!(result.is_ok());
//! # });
//! ```
use crate::Checksum;
use crate::ChecksumHasher;
use crate::ChecksumType;
use crate::Sha256Hasher;
use crate::compress_index::{Index, TryGetIndex};
use crate::get_content_checksum;
use crate::{EtagReader, EtagResolvable, HardLimitReader, HashReaderDetector, Reader, WarpReader};
use base64::Engine;
use base64::engine::general_purpose;
use http::HeaderMap;
use pin_project_lite::pin_project;
use s3s::TrailingHeaders;
use std::collections::HashMap;
use std::io::Cursor;
use std::io::Write;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use crate::compress_index::{Index, TryGetIndex};
use crate::{EtagReader, EtagResolvable, HardLimitReader, HashReaderDetector, Reader};
use tracing::error;
/// Trait for mutable operations on HashReader
pub trait HashReaderMut {
fn into_inner(self) -> Box<dyn Reader>;
fn take_inner(&mut self) -> Box<dyn Reader>;
fn bytes_read(&self) -> u64;
fn checksum(&self) -> &Option<String>;
fn set_checksum(&mut self, checksum: Option<String>);
@@ -117,6 +132,10 @@ pub trait HashReaderMut {
fn set_size(&mut self, size: i64);
fn actual_size(&self) -> i64;
fn set_actual_size(&mut self, actual_size: i64);
fn content_hash(&self) -> &Option<Checksum>;
fn content_sha256(&self) -> &Option<String>;
fn get_trailer(&self) -> Option<&TrailingHeaders>;
fn set_trailer(&mut self, trailer: Option<TrailingHeaders>);
}
pin_project! {
@@ -129,7 +148,14 @@ pin_project! {
pub actual_size: i64,
pub diskable_md5: bool,
bytes_read: u64,
// TODO: content_hash
content_hash: Option<Checksum>,
content_hasher: Option<Box<dyn ChecksumHasher>>,
content_sha256: Option<String>,
content_sha256_hasher: Option<Sha256Hasher>,
checksum_on_finish: bool,
trailer_s3s: Option<TrailingHeaders>,
}
}
@@ -139,7 +165,8 @@ impl HashReader {
mut inner: Box<dyn Reader>,
size: i64,
actual_size: i64,
md5: Option<String>,
md5hex: Option<String>,
sha256hex: Option<String>,
diskable_md5: bool,
) -> std::io::Result<Self> {
// Check if it's already a HashReader and update its parameters
@@ -152,7 +179,7 @@ impl HashReader {
}
if let Some(checksum) = existing_hash_reader.checksum() {
if let Some(ref md5) = md5 {
if let Some(ref md5) = md5hex {
if checksum != md5 {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "HashReader checksum mismatch"));
}
@@ -166,7 +193,7 @@ impl HashReader {
));
}
existing_hash_reader.set_checksum(md5.clone());
existing_hash_reader.set_checksum(md5hex.clone());
if existing_hash_reader.size() < 0 && size >= 0 {
existing_hash_reader.set_size(size);
@@ -176,13 +203,29 @@ impl HashReader {
existing_hash_reader.set_actual_size(actual_size);
}
let size = existing_hash_reader.size();
let actual_size = existing_hash_reader.actual_size();
let content_hash = existing_hash_reader.content_hash().clone();
let content_hasher = existing_hash_reader
.content_hash()
.clone()
.map(|hash| hash.checksum_type.hasher().unwrap());
let content_sha256 = existing_hash_reader.content_sha256().clone();
let content_sha256_hasher = existing_hash_reader.content_sha256().clone().map(|_| Sha256Hasher::new());
let inner = existing_hash_reader.take_inner();
return Ok(Self {
inner,
size,
checksum: md5,
checksum: md5hex.clone(),
actual_size,
diskable_md5,
bytes_read: 0,
content_sha256,
content_sha256_hasher,
content_hash,
content_hasher,
checksum_on_finish: false,
trailer_s3s: existing_hash_reader.get_trailer().cloned(),
});
}
@@ -190,23 +233,33 @@ impl HashReader {
let hr = HardLimitReader::new(inner, size);
inner = Box::new(hr);
if !diskable_md5 && !inner.is_hash_reader() {
let er = EtagReader::new(inner, md5.clone());
let er = EtagReader::new(inner, md5hex.clone());
inner = Box::new(er);
}
} else if !diskable_md5 {
let er = EtagReader::new(inner, md5.clone());
let er = EtagReader::new(inner, md5hex.clone());
inner = Box::new(er);
}
Ok(Self {
inner,
size,
checksum: md5,
checksum: md5hex,
actual_size,
diskable_md5,
bytes_read: 0,
content_hash: None,
content_hasher: None,
content_sha256: sha256hex.clone(),
content_sha256_hasher: sha256hex.clone().map(|_| Sha256Hasher::new()),
checksum_on_finish: false,
trailer_s3s: None,
})
}
pub fn into_inner(self) -> Box<dyn Reader> {
self.inner
}
/// Update HashReader parameters
pub fn update_params(&mut self, size: i64, actual_size: i64, etag: Option<String>) {
if self.size < 0 && size >= 0 {
@@ -228,9 +281,112 @@ impl HashReader {
pub fn actual_size(&self) -> i64 {
self.actual_size
}
pub fn add_checksum_from_s3s(
&mut self,
headers: &HeaderMap,
trailing_headers: Option<TrailingHeaders>,
ignore_value: bool,
) -> Result<(), std::io::Error> {
let cs = get_content_checksum(headers)?;
if ignore_value {
return Ok(());
}
if let Some(checksum) = cs {
if checksum.checksum_type.trailing() {
self.trailer_s3s = trailing_headers.clone();
}
self.content_hash = Some(checksum.clone());
return self.add_non_trailing_checksum(Some(checksum), ignore_value);
}
Ok(())
}
pub fn add_checksum_no_trailer(&mut self, header: &HeaderMap, ignore_value: bool) -> Result<(), std::io::Error> {
let cs = get_content_checksum(header)?;
if let Some(checksum) = cs {
self.content_hash = Some(checksum.clone());
return self.add_non_trailing_checksum(Some(checksum), ignore_value);
}
Ok(())
}
pub fn add_non_trailing_checksum(&mut self, checksum: Option<Checksum>, ignore_value: bool) -> Result<(), std::io::Error> {
if let Some(checksum) = checksum {
self.content_hash = Some(checksum.clone());
if ignore_value {
return Ok(());
}
if let Some(hasher) = checksum.checksum_type.hasher() {
self.content_hasher = Some(hasher);
} else {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid checksum type"));
}
}
Ok(())
}
pub fn checksum(&self) -> Option<Checksum> {
if self
.content_hash
.as_ref()
.is_none_or(|v| !v.checksum_type.is_set() || !v.valid())
{
return None;
}
self.content_hash.clone()
}
pub fn content_crc_type(&self) -> Option<ChecksumType> {
self.content_hash.as_ref().map(|v| v.checksum_type)
}
pub fn content_crc(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
if let Some(checksum) = self.content_hash.as_ref() {
if !checksum.valid() || checksum.checksum_type.is(ChecksumType::NONE) {
return map;
}
if checksum.checksum_type.trailing() {
if let Some(trailer) = self.trailer_s3s.as_ref() {
if let Some(Some(checksum_str)) = trailer.read(|headers| {
headers
.get(checksum.checksum_type.to_string())
.and_then(|value| value.to_str().ok().map(|s| s.to_string()))
}) {
map.insert(checksum.checksum_type.to_string(), checksum_str);
}
}
return map;
}
map.insert(checksum.checksum_type.to_string(), checksum.encoded.clone());
return map;
}
map
}
}
impl HashReaderMut for HashReader {
fn into_inner(self) -> Box<dyn Reader> {
self.inner
}
fn take_inner(&mut self) -> Box<dyn Reader> {
// Replace inner with an empty reader to move it out safely while keeping self valid
mem::replace(&mut self.inner, Box::new(WarpReader::new(Cursor::new(Vec::new()))))
}
fn bytes_read(&self) -> u64 {
self.bytes_read
}
@@ -258,22 +414,105 @@ impl HashReaderMut for HashReader {
fn set_actual_size(&mut self, actual_size: i64) {
self.actual_size = actual_size;
}
fn content_hash(&self) -> &Option<Checksum> {
&self.content_hash
}
fn content_sha256(&self) -> &Option<String> {
&self.content_sha256
}
fn get_trailer(&self) -> Option<&TrailingHeaders> {
self.trailer_s3s.as_ref()
}
fn set_trailer(&mut self, trailer: Option<TrailingHeaders>) {
self.trailer_s3s = trailer;
}
}
impl AsyncRead for HashReader {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
let this = self.project();
let poll = this.inner.poll_read(cx, buf);
if let Poll::Ready(Ok(())) = &poll {
let filled = buf.filled().len();
*this.bytes_read += filled as u64;
if filled == 0 {
// EOF
// TODO: check content_hash
let before = buf.filled().len();
match this.inner.poll_read(cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
let data = &buf.filled()[before..];
let filled = data.len();
*this.bytes_read += filled as u64;
if filled > 0 {
// Update SHA256 hasher
if let Some(hasher) = this.content_sha256_hasher {
if let Err(e) = hasher.write_all(data) {
error!("SHA256 hasher write error, error={:?}", e);
return Poll::Ready(Err(std::io::Error::other(e)));
}
}
// Update content hasher
if let Some(hasher) = this.content_hasher {
if let Err(e) = hasher.write_all(data) {
return Poll::Ready(Err(std::io::Error::other(e)));
}
}
}
if filled == 0 && !*this.checksum_on_finish {
// check SHA256
if let (Some(hasher), Some(expected_sha256)) = (this.content_sha256_hasher, this.content_sha256) {
let sha256 = hex_simd::encode_to_string(hasher.finalize(), hex_simd::AsciiCase::Lower);
if sha256 != *expected_sha256 {
error!("SHA256 mismatch, expected={:?}, actual={:?}", expected_sha256, sha256);
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "SHA256 mismatch")));
}
}
// check content hasher
if let (Some(hasher), Some(expected_content_hash)) = (this.content_hasher, this.content_hash) {
if expected_content_hash.checksum_type.trailing() {
if let Some(trailer) = this.trailer_s3s.as_ref() {
if let Some(Some(checksum_str)) = trailer.read(|headers| {
expected_content_hash.checksum_type.key().and_then(|key| {
headers.get(key).and_then(|value| value.to_str().ok().map(|s| s.to_string()))
})
}) {
expected_content_hash.encoded = checksum_str;
expected_content_hash.raw = general_purpose::STANDARD
.decode(&expected_content_hash.encoded)
.map_err(|_| std::io::Error::other("Invalid base64 checksum"))?;
if expected_content_hash.raw.is_empty() {
return Poll::Ready(Err(std::io::Error::other("Content hash mismatch")));
}
}
}
} else {
let content_hash = hasher.finalize();
if content_hash != expected_content_hash.raw {
error!(
"Content hash mismatch, expected={:?}, actual={:?}",
hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower),
hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower)
);
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Content hash mismatch",
)));
}
}
}
*this.checksum_on_finish = true;
}
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
}
poll
}
}
@@ -323,7 +562,7 @@ mod tests {
// Test 1: Simple creation
let reader1 = BufReader::new(Cursor::new(&data[..]));
let reader1 = Box::new(WarpReader::new(reader1));
let hash_reader1 = HashReader::new(reader1, size, actual_size, etag.clone(), false).unwrap();
let hash_reader1 = HashReader::new(reader1, size, actual_size, etag.clone(), None, false).unwrap();
assert_eq!(hash_reader1.size(), size);
assert_eq!(hash_reader1.actual_size(), actual_size);
@@ -332,7 +571,7 @@ mod tests {
let reader2 = Box::new(WarpReader::new(reader2));
let hard_limit = HardLimitReader::new(reader2, size);
let hard_limit = Box::new(hard_limit);
let hash_reader2 = HashReader::new(hard_limit, size, actual_size, etag.clone(), false).unwrap();
let hash_reader2 = HashReader::new(hard_limit, size, actual_size, etag.clone(), None, false).unwrap();
assert_eq!(hash_reader2.size(), size);
assert_eq!(hash_reader2.actual_size(), actual_size);
@@ -341,7 +580,7 @@ mod tests {
let reader3 = Box::new(WarpReader::new(reader3));
let etag_reader = EtagReader::new(reader3, etag.clone());
let etag_reader = Box::new(etag_reader);
let hash_reader3 = HashReader::new(etag_reader, size, actual_size, etag.clone(), false).unwrap();
let hash_reader3 = HashReader::new(etag_reader, size, actual_size, etag.clone(), None, false).unwrap();
assert_eq!(hash_reader3.size(), size);
assert_eq!(hash_reader3.actual_size(), actual_size);
}
@@ -351,7 +590,7 @@ mod tests {
let data = b"hello hashreader";
let reader = BufReader::new(Cursor::new(&data[..]));
let reader = Box::new(WarpReader::new(reader));
let mut hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, false).unwrap();
let mut hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, None, false).unwrap();
let mut buf = Vec::new();
let _ = hash_reader.read_to_end(&mut buf).await.unwrap();
// Since we removed EtagReader integration, etag might be None
@@ -365,7 +604,7 @@ mod tests {
let data = b"no etag";
let reader = BufReader::new(Cursor::new(&data[..]));
let reader = Box::new(WarpReader::new(reader));
let mut hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, true).unwrap();
let mut hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, None, true).unwrap();
let mut buf = Vec::new();
let _ = hash_reader.read_to_end(&mut buf).await.unwrap();
// Etag should be None when diskable_md5 is true
@@ -381,10 +620,17 @@ mod tests {
let reader = Box::new(WarpReader::new(reader));
// Create a HashReader first
let hash_reader =
HashReader::new(reader, data.len() as i64, data.len() as i64, Some("test_etag".to_string()), false).unwrap();
HashReader::new(reader, data.len() as i64, data.len() as i64, Some("test_etag".to_string()), None, false).unwrap();
let hash_reader = Box::new(WarpReader::new(hash_reader));
// Now try to create another HashReader from the existing one using new
let result = HashReader::new(hash_reader, data.len() as i64, data.len() as i64, Some("test_etag".to_string()), false);
let result = HashReader::new(
hash_reader,
data.len() as i64,
data.len() as i64,
Some("test_etag".to_string()),
None,
false,
);
assert!(result.is_ok());
let final_reader = result.unwrap();
@@ -422,7 +668,7 @@ mod tests {
let reader = Box::new(WarpReader::new(reader));
// Create HashReader
let mut hr = HashReader::new(reader, size, actual_size, Some(expected.clone()), false).unwrap();
let mut hr = HashReader::new(reader, size, actual_size, Some(expected.clone()), None, false).unwrap();
// If compression is enabled, compress data first
let compressed_data = if is_compress {
@@ -518,7 +764,7 @@ mod tests {
let reader = BufReader::new(Cursor::new(data.clone()));
let reader = Box::new(WarpReader::new(reader));
let hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, false).unwrap();
let hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, None, false).unwrap();
// Test compression
let compress_reader = CompressReader::new(hash_reader, CompressionAlgorithm::Gzip);
@@ -564,7 +810,7 @@ mod tests {
let reader = BufReader::new(Cursor::new(data.clone()));
let reader = Box::new(WarpReader::new(reader));
let hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, false).unwrap();
let hash_reader = HashReader::new(reader, data.len() as i64, data.len() as i64, None, None, false).unwrap();
// Compress
let compress_reader = CompressReader::new(hash_reader, algorithm);