mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Merge pull request #372 from guojidan/fix-scanner
refactor(ecstore): Optimize memory usage for object integrity verification
This commit is contained in:
4
.github/pull_request_template.md
vendored
4
.github/pull_request_template.md
vendored
@@ -19,9 +19,7 @@ Pull Request Template for RustFS
|
||||
|
||||
## Checklist
|
||||
- [ ] I have read and followed the [CONTRIBUTING.md](CONTRIBUTING.md) guidelines
|
||||
- [ ] Code is formatted with `cargo fmt --all`
|
||||
- [ ] Passed `cargo clippy --all-targets --all-features -- -D warnings`
|
||||
- [ ] Passed `cargo check --all-targets`
|
||||
- [ ] Passed `make pre-commit`
|
||||
- [ ] Added/updated necessary tests
|
||||
- [ ] Documentation updated (if needed)
|
||||
- [ ] CI/CD passed (if applicable)
|
||||
|
||||
@@ -148,27 +148,47 @@ impl HealStorageAPI for ECStoreHealStorage {
|
||||
async fn get_object_data(&self, bucket: &str, object: &str) -> Result<Option<Vec<u8>>> {
|
||||
debug!("Getting object data: {}/{}", bucket, object);
|
||||
|
||||
match (*self.ecstore)
|
||||
let reader = match (*self.ecstore)
|
||||
.get_object_reader(bucket, object, None, Default::default(), &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(mut reader) => match reader.read_all().await {
|
||||
Ok(data) => Ok(Some(data)),
|
||||
Ok(reader) => reader,
|
||||
Err(e) => {
|
||||
error!("Failed to get object: {}/{} - {}", bucket, object, e);
|
||||
return Err(Error::other(e));
|
||||
}
|
||||
};
|
||||
|
||||
// WARNING: Returning Vec<u8> for large objects is dangerous. To avoid OOM, cap the read size.
|
||||
// If needed, refactor callers to stream instead of buffering entire object.
|
||||
const MAX_READ_BYTES: usize = 16 * 1024 * 1024; // 16 MiB cap
|
||||
let mut buf = Vec::with_capacity(1024 * 1024);
|
||||
use tokio::io::AsyncReadExt as _;
|
||||
let mut n_read: usize = 0;
|
||||
let mut stream = reader.stream;
|
||||
loop {
|
||||
// Read in chunks
|
||||
let mut chunk = vec![0u8; 1024 * 1024];
|
||||
match stream.read(&mut chunk).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
buf.extend_from_slice(&chunk[..n]);
|
||||
n_read += n;
|
||||
if n_read > MAX_READ_BYTES {
|
||||
warn!(
|
||||
"Object data exceeds cap ({} bytes), aborting full read to prevent OOM: {}/{}",
|
||||
MAX_READ_BYTES, bucket, object
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read object data: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!("Object data not found: {}/{}", bucket, object);
|
||||
Ok(None)
|
||||
} else {
|
||||
error!("Failed to get object: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
return Err(Error::other(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(buf))
|
||||
}
|
||||
|
||||
async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> {
|
||||
@@ -208,27 +228,34 @@ impl HealStorageAPI for ECStoreHealStorage {
|
||||
async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<bool> {
|
||||
debug!("Verifying object integrity: {}/{}", bucket, object);
|
||||
|
||||
// Try to get object info and data to verify integrity
|
||||
// Check object metadata first
|
||||
match self.get_object_meta(bucket, object).await? {
|
||||
Some(obj_info) => {
|
||||
// Check if object has valid metadata
|
||||
if obj_info.size < 0 {
|
||||
warn!("Object has invalid size: {}/{}", bucket, object);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Try to read object data to verify it's accessible
|
||||
match self.get_object_data(bucket, object).await {
|
||||
Ok(Some(_)) => {
|
||||
info!("Object integrity check passed: {}/{}", bucket, object);
|
||||
Ok(true)
|
||||
// Stream-read the object to a sink to avoid loading into memory
|
||||
match (*self.ecstore)
|
||||
.get_object_reader(bucket, object, None, Default::default(), &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(reader) => {
|
||||
let mut stream = reader.stream;
|
||||
match tokio::io::copy(&mut stream, &mut tokio::io::sink()).await {
|
||||
Ok(_) => {
|
||||
info!("Object integrity check passed: {}/{}", bucket, object);
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Object stream read failed: {}/{} - {}", bucket, object, e);
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("Object data not found: {}/{}", bucket, object);
|
||||
Ok(false)
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Object data read failed: {}/{}", bucket, object);
|
||||
Err(e) => {
|
||||
warn!("Failed to get object reader: {}/{} - {}", bucket, object, e);
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod app;
|
||||
pub(crate) mod env;
|
||||
pub(crate) mod tls;
|
||||
pub mod app;
|
||||
pub mod env;
|
||||
pub mod tls;
|
||||
|
||||
@@ -5354,9 +5354,10 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
|
||||
let mut get_object_reader =
|
||||
<Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
|
||||
let _ = get_object_reader.read_all().await?;
|
||||
let get_object_reader = <Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
|
||||
// Stream to sink to avoid loading entire object into memory during verification
|
||||
let mut reader = get_object_reader.stream;
|
||||
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -882,11 +882,15 @@ impl StorageAPI for Sets {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
|
||||
self.get_disks_by_key(object)
|
||||
.verify_object_integrity(bucket, object, opts)
|
||||
.await
|
||||
let gor = self.get_object_reader(bucket, object, None, HeaderMap::new(), opts).await?;
|
||||
let mut reader = gor.stream;
|
||||
|
||||
// Stream data to sink instead of reading all into memory to prevent OOM
|
||||
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2238,9 +2238,10 @@ impl StorageAPI for ECStore {
|
||||
}
|
||||
|
||||
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
|
||||
let mut get_object_reader =
|
||||
<Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
|
||||
let _ = get_object_reader.read_all().await?;
|
||||
let get_object_reader = <Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
|
||||
// Stream to sink to avoid loading entire object into memory during verification
|
||||
let mut reader = get_object_reader.stream;
|
||||
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ categories = ["web-programming", "development-tools", "filesystem"]
|
||||
documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/"
|
||||
|
||||
[dependencies]
|
||||
rustfs-config = { workspace = true, features = ["notify"] }
|
||||
rustfs-config = { workspace = true, features = ["notify", "constants"] }
|
||||
rustfs-ecstore = { workspace = true }
|
||||
rustfs-utils = { workspace = true, features = ["path", "sys"] }
|
||||
async-trait = { workspace = true }
|
||||
|
||||
@@ -196,12 +196,13 @@ pub fn create_multi_cert_resolver(
|
||||
|
||||
/// Checks if TLS key logging is enabled.
|
||||
pub fn tls_key_log() -> bool {
|
||||
env::var(rustfs_config::ENV_TLS_KEYLOG)
|
||||
env::var("RUSTFS_TLS_KEYLOG")
|
||||
.map(|v| {
|
||||
v.eq_ignore_ascii_case(rustfs_config::EnableState::One.as_str())
|
||||
|| v.eq_ignore_ascii_case(rustfs_config::EnableState::On.as_str())
|
||||
|| v.eq_ignore_ascii_case(rustfs_config::EnableState::True.as_str())
|
||||
|| v.eq_ignore_ascii_case(rustfs_config::EnableState::Yes.as_str())
|
||||
let v = v.trim();
|
||||
v.eq_ignore_ascii_case("1")
|
||||
|| v.eq_ignore_ascii_case("on")
|
||||
|| v.eq_ignore_ascii_case("true")
|
||||
|| v.eq_ignore_ascii_case("yes")
|
||||
})
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user