From fe4fabb195c0fe90f4fc0ca04ca564bb5ffb837f Mon Sep 17 00:00:00 2001 From: yihong Date: Tue, 16 Dec 2025 11:45:45 +0800 Subject: [PATCH] fix: other two memory leak in the code base (#1160) Signed-off-by: yihong0618 Co-authored-by: houseme --- Cargo.lock | 1 + crates/s3select-api/Cargo.toml | 1 + crates/s3select-api/src/query/execution.rs | 27 +++++++++++----------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc864a44..f5118588 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7535,6 +7535,7 @@ dependencies = [ "futures-core", "http 1.4.0", "object_store", + "parking_lot", "pin-project-lite", "rustfs-common", "rustfs-ecstore", diff --git a/crates/s3select-api/Cargo.toml b/crates/s3select-api/Cargo.toml index bcb575b5..60c14156 100644 --- a/crates/s3select-api/Cargo.toml +++ b/crates/s3select-api/Cargo.toml @@ -39,6 +39,7 @@ object_store = { workspace = true } pin-project-lite.workspace = true s3s.workspace = true snafu = { workspace = true, features = ["backtrace"] } +parking_lot.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true diff --git a/crates/s3select-api/src/query/execution.rs b/crates/s3select-api/src/query/execution.rs index ce26ff0c..86559908 100644 --- a/crates/s3select-api/src/query/execution.rs +++ b/crates/s3select-api/src/query/execution.rs @@ -15,10 +15,11 @@ use std::fmt::Display; use std::pin::Pin; use std::sync::Arc; -use std::sync::atomic::{AtomicPtr, Ordering}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use parking_lot::RwLock; + use async_trait::async_trait; use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; @@ -132,7 +133,7 @@ pub struct QueryStateMachine { pub session: SessionCtx, pub query: Query, - state: AtomicPtr, + state: RwLock, start: Instant, } @@ -141,14 +142,14 @@ impl QueryStateMachine { Self { session, query, - state: AtomicPtr::new(Box::into_raw(Box::new(QueryState::ACCEPTING))), + state: RwLock::new(QueryState::ACCEPTING), start: Instant::now(), } } pub fn begin_analyze(&self) { // TODO record time - self.translate_to(Box::new(QueryState::RUNNING(RUNNING::ANALYZING))); + self.translate_to(QueryState::RUNNING(RUNNING::ANALYZING)); } pub fn end_analyze(&self) { @@ -157,7 +158,7 @@ impl QueryStateMachine { pub fn begin_optimize(&self) { // TODO record time - self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTIMIZING))); + self.translate_to(QueryState::RUNNING(RUNNING::OPTIMIZING)); } pub fn end_optimize(&self) { @@ -166,7 +167,7 @@ impl QueryStateMachine { pub fn begin_schedule(&self) { // TODO - self.translate_to(Box::new(QueryState::RUNNING(RUNNING::SCHEDULING))); + self.translate_to(QueryState::RUNNING(RUNNING::SCHEDULING)); } pub fn end_schedule(&self) { @@ -175,29 +176,29 @@ impl QueryStateMachine { pub fn finish(&self) { // TODO - self.translate_to(Box::new(QueryState::DONE(DONE::FINISHED))); + self.translate_to(QueryState::DONE(DONE::FINISHED)); } pub fn cancel(&self) { // TODO - self.translate_to(Box::new(QueryState::DONE(DONE::CANCELLED))); + self.translate_to(QueryState::DONE(DONE::CANCELLED)); } pub fn fail(&self) { // TODO - self.translate_to(Box::new(QueryState::DONE(DONE::FAILED))); + self.translate_to(QueryState::DONE(DONE::FAILED)); } - pub fn state(&self) -> &QueryState { - unsafe { &*self.state.load(Ordering::Relaxed) } + pub fn state(&self) -> QueryState { + self.state.read().clone() } pub fn duration(&self) -> Duration { self.start.elapsed() } - fn translate_to(&self, state: Box) { - self.state.store(Box::into_raw(state), Ordering::Relaxed); + fn translate_to(&self, state: QueryState) { + *self.state.write() = state; } }