diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index 48e485b5..2ff8e885 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -148,27 +148,47 @@ impl HealStorageAPI for ECStoreHealStorage { async fn get_object_data(&self, bucket: &str, object: &str) -> Result>> { debug!("Getting object data: {}/{}", bucket, object); - match (*self.ecstore) + let reader = match (*self.ecstore) .get_object_reader(bucket, object, None, Default::default(), &Default::default()) .await { - Ok(mut reader) => match reader.read_all().await { - Ok(data) => Ok(Some(data)), + Ok(reader) => reader, + Err(e) => { + error!("Failed to get object: {}/{} - {}", bucket, object, e); + return Err(Error::other(e)); + } + }; + + // WARNING: Returning Vec for large objects is dangerous. To avoid OOM, cap the read size. + // If needed, refactor callers to stream instead of buffering entire object. + const MAX_READ_BYTES: usize = 16 * 1024 * 1024; // 16 MiB cap + let mut buf = Vec::with_capacity(1024 * 1024); + use tokio::io::AsyncReadExt as _; + let mut n_read: usize = 0; + let mut stream = reader.stream; + loop { + // Read in chunks + let mut chunk = vec![0u8; 1024 * 1024]; + match stream.read(&mut chunk).await { + Ok(0) => break, + Ok(n) => { + buf.extend_from_slice(&chunk[..n]); + n_read += n; + if n_read > MAX_READ_BYTES { + warn!( + "Object data exceeds cap ({} bytes), aborting full read to prevent OOM: {}/{}", + MAX_READ_BYTES, bucket, object + ); + return Ok(None); + } + } Err(e) => { error!("Failed to read object data: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) - } - }, - Err(e) => { - if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) { - debug!("Object data not found: {}/{}", bucket, object); - Ok(None) - } else { - error!("Failed to get object: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) + return Err(Error::other(e)); } } } + Ok(Some(buf)) } async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> { @@ -208,27 +228,34 @@ impl HealStorageAPI for ECStoreHealStorage { async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result { debug!("Verifying object integrity: {}/{}", bucket, object); - // Try to get object info and data to verify integrity + // Check object metadata first match self.get_object_meta(bucket, object).await? { Some(obj_info) => { - // Check if object has valid metadata if obj_info.size < 0 { warn!("Object has invalid size: {}/{}", bucket, object); return Ok(false); } - // Try to read object data to verify it's accessible - match self.get_object_data(bucket, object).await { - Ok(Some(_)) => { - info!("Object integrity check passed: {}/{}", bucket, object); - Ok(true) + // Stream-read the object to a sink to avoid loading into memory + match (*self.ecstore) + .get_object_reader(bucket, object, None, Default::default(), &Default::default()) + .await + { + Ok(reader) => { + let mut stream = reader.stream; + match tokio::io::copy(&mut stream, &mut tokio::io::sink()).await { + Ok(_) => { + info!("Object integrity check passed: {}/{}", bucket, object); + Ok(true) + } + Err(e) => { + warn!("Object stream read failed: {}/{} - {}", bucket, object, e); + Ok(false) + } + } } - Ok(None) => { - warn!("Object data not found: {}/{}", bucket, object); - Ok(false) - } - Err(_) => { - warn!("Object data read failed: {}/{}", bucket, object); + Err(e) => { + warn!("Failed to get object reader: {}/{} - {}", bucket, object, e); Ok(false) } } diff --git a/crates/config/src/constants/mod.rs b/crates/config/src/constants/mod.rs index aecd55b5..3bc007b2 100644 --- a/crates/config/src/constants/mod.rs +++ b/crates/config/src/constants/mod.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod app; -pub(crate) mod env; -pub(crate) mod tls; +pub mod app; +pub mod env; +pub mod tls; diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 8f486fd4..6878f232 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -5354,9 +5354,10 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { - let mut get_object_reader = - ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; - let _ = get_object_reader.read_all().await?; + let get_object_reader = ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; + // Stream to sink to avoid loading entire object into memory during verification + let mut reader = get_object_reader.stream; + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; Ok(()) } } diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 7ec6e01d..8b556be0 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -882,11 +882,15 @@ impl StorageAPI for Sets { unimplemented!() } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip(self))] async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { - self.get_disks_by_key(object) - .verify_object_integrity(bucket, object, opts) - .await + let gor = self.get_object_reader(bucket, object, None, HeaderMap::new(), opts).await?; + let mut reader = gor.stream; + + // Stream data to sink instead of reading all into memory to prevent OOM + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; + + Ok(()) } } diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 716f0d07..ccfba636 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -2238,9 +2238,10 @@ impl StorageAPI for ECStore { } async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { - let mut get_object_reader = - ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; - let _ = get_object_reader.read_all().await?; + let get_object_reader = ::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?; + // Stream to sink to avoid loading entire object into memory during verification + let mut reader = get_object_reader.stream; + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; Ok(()) } } diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index bf58db49..c98bd9ce 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -26,7 +26,7 @@ categories = ["web-programming", "development-tools", "filesystem"] documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/" [dependencies] -rustfs-config = { workspace = true, features = ["notify"] } +rustfs-config = { workspace = true, features = ["notify", "constants"] } rustfs-ecstore = { workspace = true } rustfs-utils = { workspace = true, features = ["path", "sys"] } async-trait = { workspace = true } diff --git a/crates/utils/src/certs.rs b/crates/utils/src/certs.rs index b0c3a118..12fdbc3a 100644 --- a/crates/utils/src/certs.rs +++ b/crates/utils/src/certs.rs @@ -196,12 +196,13 @@ pub fn create_multi_cert_resolver( /// Checks if TLS key logging is enabled. pub fn tls_key_log() -> bool { - env::var(rustfs_config::ENV_TLS_KEYLOG) + env::var("RUSTFS_TLS_KEYLOG") .map(|v| { - v.eq_ignore_ascii_case(rustfs_config::EnableState::One.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::On.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::True.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::Yes.as_str()) + let v = v.trim(); + v.eq_ignore_ascii_case("1") + || v.eq_ignore_ascii_case("on") + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") }) .unwrap_or(false) }