mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix: other two memory leak in the code base (#1160)
Signed-off-by: yihong0618 <zouzou0208@gmail.com> Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7535,6 +7535,7 @@ dependencies = [
|
|||||||
"futures-core",
|
"futures-core",
|
||||||
"http 1.4.0",
|
"http 1.4.0",
|
||||||
"object_store",
|
"object_store",
|
||||||
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rustfs-common",
|
"rustfs-common",
|
||||||
"rustfs-ecstore",
|
"rustfs-ecstore",
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ object_store = { workspace = true }
|
|||||||
pin-project-lite.workspace = true
|
pin-project-lite.workspace = true
|
||||||
s3s.workspace = true
|
s3s.workspace = true
|
||||||
snafu = { workspace = true, features = ["backtrace"] }
|
snafu = { workspace = true, features = ["backtrace"] }
|
||||||
|
parking_lot.workspace = true
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tokio-util.workspace = true
|
tokio-util.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
|||||||
@@ -15,10 +15,11 @@
|
|||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicPtr, Ordering};
|
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use datafusion::arrow::datatypes::{Schema, SchemaRef};
|
use datafusion::arrow::datatypes::{Schema, SchemaRef};
|
||||||
use datafusion::arrow::record_batch::RecordBatch;
|
use datafusion::arrow::record_batch::RecordBatch;
|
||||||
@@ -132,7 +133,7 @@ pub struct QueryStateMachine {
|
|||||||
pub session: SessionCtx,
|
pub session: SessionCtx,
|
||||||
pub query: Query,
|
pub query: Query,
|
||||||
|
|
||||||
state: AtomicPtr<QueryState>,
|
state: RwLock<QueryState>,
|
||||||
start: Instant,
|
start: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -141,14 +142,14 @@ impl QueryStateMachine {
|
|||||||
Self {
|
Self {
|
||||||
session,
|
session,
|
||||||
query,
|
query,
|
||||||
state: AtomicPtr::new(Box::into_raw(Box::new(QueryState::ACCEPTING))),
|
state: RwLock::new(QueryState::ACCEPTING),
|
||||||
start: Instant::now(),
|
start: Instant::now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn begin_analyze(&self) {
|
pub fn begin_analyze(&self) {
|
||||||
// TODO record time
|
// TODO record time
|
||||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::ANALYZING)));
|
self.translate_to(QueryState::RUNNING(RUNNING::ANALYZING));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn end_analyze(&self) {
|
pub fn end_analyze(&self) {
|
||||||
@@ -157,7 +158,7 @@ impl QueryStateMachine {
|
|||||||
|
|
||||||
pub fn begin_optimize(&self) {
|
pub fn begin_optimize(&self) {
|
||||||
// TODO record time
|
// TODO record time
|
||||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTIMIZING)));
|
self.translate_to(QueryState::RUNNING(RUNNING::OPTIMIZING));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn end_optimize(&self) {
|
pub fn end_optimize(&self) {
|
||||||
@@ -166,7 +167,7 @@ impl QueryStateMachine {
|
|||||||
|
|
||||||
pub fn begin_schedule(&self) {
|
pub fn begin_schedule(&self) {
|
||||||
// TODO
|
// TODO
|
||||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::SCHEDULING)));
|
self.translate_to(QueryState::RUNNING(RUNNING::SCHEDULING));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn end_schedule(&self) {
|
pub fn end_schedule(&self) {
|
||||||
@@ -175,29 +176,29 @@ impl QueryStateMachine {
|
|||||||
|
|
||||||
pub fn finish(&self) {
|
pub fn finish(&self) {
|
||||||
// TODO
|
// TODO
|
||||||
self.translate_to(Box::new(QueryState::DONE(DONE::FINISHED)));
|
self.translate_to(QueryState::DONE(DONE::FINISHED));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cancel(&self) {
|
pub fn cancel(&self) {
|
||||||
// TODO
|
// TODO
|
||||||
self.translate_to(Box::new(QueryState::DONE(DONE::CANCELLED)));
|
self.translate_to(QueryState::DONE(DONE::CANCELLED));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fail(&self) {
|
pub fn fail(&self) {
|
||||||
// TODO
|
// TODO
|
||||||
self.translate_to(Box::new(QueryState::DONE(DONE::FAILED)));
|
self.translate_to(QueryState::DONE(DONE::FAILED));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn state(&self) -> &QueryState {
|
pub fn state(&self) -> QueryState {
|
||||||
unsafe { &*self.state.load(Ordering::Relaxed) }
|
self.state.read().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn duration(&self) -> Duration {
|
pub fn duration(&self) -> Duration {
|
||||||
self.start.elapsed()
|
self.start.elapsed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn translate_to(&self, state: Box<QueryState>) {
|
fn translate_to(&self, state: QueryState) {
|
||||||
self.state.store(Box::into_raw(state), Ordering::Relaxed);
|
*self.state.write() = state;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user