From 24e3d3a2ce4f594d778895d37ac622d5ed960e5a Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Sun, 10 Aug 2025 21:19:48 +0800 Subject: [PATCH 1/2] refactor(ecstore): Optimize memory usage for object integrity verification Change the object integrity verification from reading all data to streaming processing to avoid memory overflow caused by large objects. Modify the TLS key log check to use environment variables directly instead of configuration constants. Add memory limits for object data reading in the AHM module. Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/ahm/src/heal/storage.rs | 79 ++++++++++++++++++++---------- crates/config/src/constants/mod.rs | 6 +-- crates/ecstore/src/set_disk.rs | 7 +-- crates/ecstore/src/sets.rs | 12 +++-- crates/ecstore/src/store.rs | 7 +-- crates/notify/Cargo.toml | 2 +- crates/utils/src/certs.rs | 11 +++-- 7 files changed, 79 insertions(+), 45 deletions(-) diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index 48e485b5..2ff8e885 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -148,27 +148,47 @@ impl HealStorageAPI for ECStoreHealStorage { async fn get_object_data(&self, bucket: &str, object: &str) -> Result>> { debug!("Getting object data: {}/{}", bucket, object); - match (*self.ecstore) + let reader = match (*self.ecstore) .get_object_reader(bucket, object, None, Default::default(), &Default::default()) .await { - Ok(mut reader) => match reader.read_all().await { - Ok(data) => Ok(Some(data)), + Ok(reader) => reader, + Err(e) => { + error!("Failed to get object: {}/{} - {}", bucket, object, e); + return Err(Error::other(e)); + } + }; + + // WARNING: Returning Vec for large objects is dangerous. To avoid OOM, cap the read size. + // If needed, refactor callers to stream instead of buffering entire object. + const MAX_READ_BYTES: usize = 16 * 1024 * 1024; // 16 MiB cap + let mut buf = Vec::with_capacity(1024 * 1024); + use tokio::io::AsyncReadExt as _; + let mut n_read: usize = 0; + let mut stream = reader.stream; + loop { + // Read in chunks + let mut chunk = vec![0u8; 1024 * 1024]; + match stream.read(&mut chunk).await { + Ok(0) => break, + Ok(n) => { + buf.extend_from_slice(&chunk[..n]); + n_read += n; + if n_read > MAX_READ_BYTES { + warn!( + "Object data exceeds cap ({} bytes), aborting full read to prevent OOM: {}/{}", + MAX_READ_BYTES, bucket, object + ); + return Ok(None); + } + } Err(e) => { error!("Failed to read object data: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) - } - }, - Err(e) => { - if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) { - debug!("Object data not found: {}/{}", bucket, object); - Ok(None) - } else { - error!("Failed to get object: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) + return Err(Error::other(e)); } } } + Ok(Some(buf)) } async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> { @@ -208,27 +228,34 @@ impl HealStorageAPI for ECStoreHealStorage { async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result { debug!("Verifying object integrity: {}/{}", bucket, object); - // Try to get object info and data to verify integrity + // Check object metadata first match self.get_object_meta(bucket, object).await? { Some(obj_info) => { - // Check if object has valid metadata if obj_info.size < 0 { warn!("Object has invalid size: {}/{}", bucket, object); return Ok(false); } - // Try to read object data to verify it's accessible - match self.get_object_data(bucket, object).await { - Ok(Some(_)) => { - info!("Object integrity check passed: {}/{}", bucket, object); - Ok(true) + // Stream-read the object to a sink to avoid loading into memory + match (*self.ecstore) + .get_object_reader(bucket, object, None, Default::default(), &Default::default()) + .await + { + Ok(reader) => { + let mut stream = reader.stream; + match tokio::io::copy(&mut stream, &mut tokio::io::sink()).await { + Ok(_) => { + info!("Object integrity check passed: {}/{}", bucket, object); + Ok(true) + } + Err(e) => { + warn!("Object stream read failed: {}/{} - {}", bucket, object, e); + Ok(false) + } + } } - Ok(None) => { - warn!("Object data not found: {}/{}", bucket, object); - Ok(false) - } - Err(_) => { - warn!("Object data read failed: {}/{}", bucket, object); + Err(e) => { + warn!("Failed to get object reader: {}/{} - {}", bucket, object, e); Ok(false) } } diff --git a/crates/config/src/constants/mod.rs b/crates/config/src/constants/mod.rs index aecd55b5..3bc007b2 100644 --- a/crates/config/src/constants/mod.rs +++ b/crates/config/src/constants/mod.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod app; -pub(crate) mod env; -pub(crate) mod tls; +pub mod app; +pub mod env; +pub mod tls; diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 8f486fd4..6878f232 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -5354,9 +5354,10 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { - let mut get_object_reader = - ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; - let _ = get_object_reader.read_all().await?; + let get_object_reader = ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; + // Stream to sink to avoid loading entire object into memory during verification + let mut reader = get_object_reader.stream; + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; Ok(()) } } diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 7ec6e01d..8b556be0 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -882,11 +882,15 @@ impl StorageAPI for Sets { unimplemented!() } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip(self))] async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { - self.get_disks_by_key(object) - .verify_object_integrity(bucket, object, opts) - .await + let gor = self.get_object_reader(bucket, object, None, HeaderMap::new(), opts).await?; + let mut reader = gor.stream; + + // Stream data to sink instead of reading all into memory to prevent OOM + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; + + Ok(()) } } diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 716f0d07..ccfba636 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -2238,9 +2238,10 @@ impl StorageAPI for ECStore { } async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { - let mut get_object_reader = - ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; - let _ = get_object_reader.read_all().await?; + let get_object_reader = ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; + // Stream to sink to avoid loading entire object into memory during verification + let mut reader = get_object_reader.stream; + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; Ok(()) } } diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index bf58db49..c98bd9ce 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -26,7 +26,7 @@ categories = ["web-programming", "development-tools", "filesystem"] documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/" [dependencies] -rustfs-config = { workspace = true, features = ["notify"] } +rustfs-config = { workspace = true, features = ["notify", "constants"] } rustfs-ecstore = { workspace = true } rustfs-utils = { workspace = true, features = ["path", "sys"] } async-trait = { workspace = true } diff --git a/crates/utils/src/certs.rs b/crates/utils/src/certs.rs index b0c3a118..12fdbc3a 100644 --- a/crates/utils/src/certs.rs +++ b/crates/utils/src/certs.rs @@ -196,12 +196,13 @@ pub fn create_multi_cert_resolver( /// Checks if TLS key logging is enabled. pub fn tls_key_log() -> bool { - env::var(rustfs_config::ENV_TLS_KEYLOG) + env::var("RUSTFS_TLS_KEYLOG") .map(|v| { - v.eq_ignore_ascii_case(rustfs_config::EnableState::One.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::On.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::True.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::Yes.as_str()) + let v = v.trim(); + v.eq_ignore_ascii_case("1") + || v.eq_ignore_ascii_case("on") + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") }) .unwrap_or(false) } From 3497ccfada9554b2cb04458f735ac31af2d607de Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Sun, 10 Aug 2025 21:29:30 +0800 Subject: [PATCH 2/2] Chore: reduce PR template checklist Signed-off-by: junxiang Mu <1948535941@qq.com> --- .github/pull_request_template.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 43f63667..7f346586 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -19,9 +19,7 @@ Pull Request Template for RustFS ## Checklist - [ ] I have read and followed the [CONTRIBUTING.md](CONTRIBUTING.md) guidelines -- [ ] Code is formatted with `cargo fmt --all` -- [ ] Passed `cargo clippy --all-targets --all-features -- -D warnings` -- [ ] Passed `cargo check --all-targets` +- [ ] Passed `make pre-commit` - [ ] Added/updated necessary tests - [ ] Documentation updated (if needed) - [ ] CI/CD passed (if applicable)