Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-03-13 09:43:53 +00:00
parent 63ef986bac
commit 0b270bf0cc
41 changed files with 4402 additions and 1256 deletions

37
Cargo.lock generated
View File

@@ -183,9 +183,19 @@ name = "api"
version = "0.0.1"
dependencies = [
"async-trait",
"bytes",
"chrono",
"datafusion",
"ecstore",
"futures",
"futures-core",
"http",
"object_store",
"s3s",
"snafu",
"tokio",
"tracing",
"url",
]
[[package]]
@@ -5711,7 +5721,7 @@ dependencies = [
"crypto",
"futures",
"ipnetwork",
"itertools",
"itertools 0.14.0",
"jsonwebtoken",
"lazy_static",
"log",
@@ -5989,9 +5999,14 @@ name = "query"
version = "0.0.1"
dependencies = [
"api",
"async-recursion",
"async-trait",
"datafusion",
"derive_builder",
"futures",
"parking_lot 0.12.3",
"s3s",
"snafu",
"tokio",
"tracing",
]
@@ -6190,22 +6205,6 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
[[package]]
name = "reader"
version = "0.0.1"
dependencies = [
"bytes",
"futures",
"hex-simd",
"md-5",
"pin-project-lite",
"s3s",
"sha2 0.11.0-pre.4",
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]]
name = "recursive"
version = "0.1.1"
@@ -6524,6 +6523,7 @@ dependencies = [
name = "rustfs"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
"atoi",
"axum",
@@ -6533,6 +6533,8 @@ dependencies = [
"common",
"const-str",
"crypto",
"csv",
"datafusion",
"ecstore",
"flatbuffers",
"futures",
@@ -6560,6 +6562,7 @@ dependencies = [
"prost-types",
"protobuf",
"protos",
"query",
"rmp-serde",
"rust-embed",
"s3s",

View File

@@ -31,6 +31,7 @@ all = "warn"
[workspace.dependencies]
madmin = { path = "./madmin" }
async-recursion = "1.0.5"
async-trait = "0.1.86"
backon = "1.3.0"
bytes = "1.9.0"

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -20,6 +20,8 @@ log.workspace = true
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true
csv = "1.3.1"
datafusion = { workspace = true }
common.workspace = true
ecstore.workspace = true
policy.workspace =true
@@ -69,6 +71,8 @@ const-str = { version = "0.6.1", features = ["std", "proc"] }
atoi = "2.0.0"
serde_urlencoded = "0.7.1"
crypto = { path = "../crypto" }
query = { path = "../s3select/query" }
api = { path = "../s3select/api" }
iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
tower-http = { version = "0.6.2", features = ["cors"] }

View File

@@ -4,8 +4,11 @@ use super::options::extract_metadata;
use super::options::put_opts;
use crate::auth::get_condition_values;
use crate::storage::access::ReqInfo;
use api::query::Context;
use api::query::Query;
use bytes::Bytes;
use common::error::Result;
use datafusion::arrow::json::writer::JsonArray;
use ecstore::bucket::error::BucketMetadataError;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
@@ -39,6 +42,9 @@ use ecstore::xhttp;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::HeaderMap;
use iam::policy::action::Action;
use api::server::dbms::DatabaseManagerSystem;
use iam::policy::action::S3Action;
use lazy_static::lazy_static;
use log::warn;
use policy::auth;
@@ -47,6 +53,7 @@ use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use query::instance::make_cnosdbms;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Error;
@@ -54,6 +61,8 @@ use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::fmt::Debug;
use std::str::FromStr;
use tokio_util::io::ReaderStream;
@@ -1859,6 +1868,70 @@ impl S3 for FS {
}
Ok(S3Response::new(PutObjectAclOutput::default()))
}
async fn select_object_content(
&self,
req: S3Request<SelectObjectContentInput>,
) -> S3Result<S3Response<SelectObjectContentOutput>> {
info!("handle select_object_content");
let input = req.input;
info!("{:?}", input);
let db = make_cnosdbms(input).await.map_err(|_| {
s3_error!(InternalError)
})?;
let query = Query::new(Context {input: input.clone()}, input.request.expression);
let result = db.execute(&query).await.map_err(|_| {
s3_error!(InternalError)
})?;
let results = result.result().chunk_result().await.unwrap().to_vec();
let mut buffer = Vec::new();
if input.request.output_serialization.csv.is_some() {
let mut csv_writer = CsvWriterBuilder::new().with_header(false).build(&mut buffer);
for batch in results {
csv_writer
.write(&batch)
.map_err(|e| s3_error!(InternalError, "cann't encode output to csv. e: {}", e.to_string()))?;
}
} else if input.request.output_serialization.json.is_some() {
let mut json_writer = JsonWriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buffer);
for batch in results {
json_writer
.write(&batch)
.map_err(|e| s3_error!(InternalError, "cann't encode output to json. e: {}", e.to_string()))?;
}
json_writer
.finish()
.map_err(|e| s3_error!(InternalError, "writer output into json error, e: {}", e.to_string()))?;
} else {
return Err(s3_error!(InvalidArgument, "unknow output format"));
}
let (tx, rx) = mpsc::channel::<S3Result<SelectObjectContentEvent>>(2);
let stream = ReceiverStream::new(rx);
tokio::spawn(async move {
let _ = tx
.send(Ok(SelectObjectContentEvent::Cont(ContinuationEvent::default())))
.await;
let _ = tx
.send(Ok(SelectObjectContentEvent::Records(RecordsEvent {
payload: Some(Bytes::from(buffer)),
})))
.await;
let _ = tx.send(Ok(SelectObjectContentEvent::End(EndEvent::default()))).await;
drop(tx);
});
Ok(S3Response::new(SelectObjectContentOutput {
payload: Some(SelectObjectContentEventStream::new(stream)),
}))
}
}
#[allow(dead_code)]

View File

@@ -5,6 +5,16 @@ edition.workspace = true
[dependencies]
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
datafusion = { workspace = true }
ecstore.workspace = true
futures = { workspace = true }
snafu = { workspace = true, features = ["backtrace"] }
futures-core = "0.3.31"
http.workspace = true
object_store = "0.11.2"
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio.workspace = true
tracing.workspace = true
url.workspace = true

View File

@@ -1,8 +1,9 @@
use std::fmt::Display;
use datafusion::common::DataFusionError;
use datafusion::{common::DataFusionError, sql::sqlparser::parser::ParserError};
use snafu::{Backtrace, Location, Snafu};
pub mod object_store;
pub mod query;
pub mod server;
@@ -16,6 +17,21 @@ pub enum QueryError {
location: Location,
backtrace: Backtrace,
},
#[snafu(display("This feature is not implemented: {}", err))]
NotImplemented { err: String },
#[snafu(display("Multi-statement not allow, found num:{}, sql:{}", num, sql))]
MultiStatement { num: usize, sql: String },
#[snafu(display("Failed to build QueryDispatcher. err: {}", err))]
BuildQueryDispatcher { err: String },
#[snafu(display("The query has been canceled"))]
Cancel,
#[snafu(display("{}", source))]
Parser { source: ParserError },
}
impl From<DataFusionError> for QueryError {

View File

@@ -0,0 +1,150 @@
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use ecstore::new_object_layer_fn;
use ecstore::store::ECStore;
use ecstore::store_api::ObjectIO;
use ecstore::store_api::ObjectOptions;
use ecstore::StorageAPI;
use futures::stream;
use futures::StreamExt;
use futures_core::stream::BoxStream;
use http::HeaderMap;
use object_store::path::Path;
use object_store::Attributes;
use object_store::GetOptions;
use object_store::GetResult;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::PutMultipartOpts;
use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::{Error as o_Error, Result};
use s3s::dto::SelectObjectContentInput;
use s3s::s3_error;
use s3s::S3Result;
use std::ops::Range;
use std::sync::Arc;
use tracing::info;
#[derive(Debug)]
pub struct EcObjectStore {
input: SelectObjectContentInput,
store: Arc<ECStore>,
}
impl EcObjectStore {
pub fn new(input: SelectObjectContentInput) -> S3Result<Self> {
let Some(store) = new_object_layer_fn() else {
return Err(s3_error!(InternalError, "ec store not inited"));
};
Ok(Self { input, store })
}
}
impl std::fmt::Display for EcObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("EcObjectStore")
}
}
#[async_trait]
impl ObjectStore for EcObjectStore {
async fn put_opts(&self, _location: &Path, _payload: PutPayload, _opts: PutOptions) -> Result<PutResult> {
unimplemented!()
}
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOpts) -> Result<Box<dyn MultipartUpload>> {
unimplemented!()
}
async fn get_opts(&self, location: &Path, _options: GetOptions) -> Result<GetResult> {
info!("{:?}", location);
let opts = ObjectOptions::default();
let h = HeaderMap::new();
let reader = self
.store
.get_object_reader(&self.input.bucket, &self.input.key, None, h, &opts)
.await
.map_err(|_| o_Error::NotFound {
path: format!("{}/{}", self.input.bucket, self.input.key),
source: "can not get object info".into(),
})?;
let stream = stream::unfold(reader.stream, |mut blob| async move {
match blob.next().await {
Some(Ok(chunk)) => {
let bytes = Bytes::from(chunk);
Some((Ok(bytes), blob))
}
_ => None,
}
})
.boxed();
let meta = ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: reader.object_info.size,
e_tag: reader.object_info.etag,
version: None,
};
let attributes = Attributes::default();
Ok(GetResult {
payload: object_store::GetResultPayload::Stream(stream),
meta,
range: 0..reader.object_info.size,
attributes,
})
}
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
unimplemented!()
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
info!("{:?}", location);
let opts = ObjectOptions::default();
let info = self
.store
.get_object_info(&self.input.bucket, &self.input.key, &opts)
.await
.map_err(|_| o_Error::NotFound {
path: format!("{}/{}", self.input.bucket, self.input.key),
source: "can not get object info".into(),
})?;
Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: info.size,
e_tag: info.etag,
version: None,
})
}
async fn delete(&self, _location: &Path) -> Result<()> {
unimplemented!()
}
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
unimplemented!()
}
async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
unimplemented!()
}
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}
async fn copy_if_not_exists(&self, _from: &Path, _too: &Path) -> Result<()> {
unimplemented!()
}
}

View File

@@ -0,0 +1,12 @@
use std::sync::Arc;
use datafusion::logical_expr::LogicalPlan;
use super::session::SessionCtx;
use crate::QueryResult;
pub type AnalyzerRef = Arc<dyn Analyzer + Send + Sync>;
pub trait Analyzer {
fn analyze(&self, plan: &LogicalPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
}

View File

@@ -12,10 +12,6 @@ use super::{
#[async_trait]
pub trait QueryDispatcher: Send + Sync {
async fn start(&self) -> QueryResult<()>;
fn stop(&self);
// fn create_query_id(&self) -> QueryId;
// fn query_info(&self, id: &QueryId);

View File

@@ -43,18 +43,6 @@ pub trait QueryExecution: Send + Sync {
async fn start(&self) -> QueryResult<Output>;
// 停止
fn cancel(&self) -> QueryResult<()>;
// query状态
// 查询计划
// 静态信息
// fn info(&self) -> QueryInfo;
// 运行时信息
// fn status(&self) -> QueryStatus;
// sql
// 资源占用cpu时间/内存/吞吐量等)
// 是否需要持久化query信息
fn need_persist(&self) -> bool {
false
}
}
pub enum Output {

View File

@@ -1,6 +1,12 @@
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::logical_expr::LogicalPlan as DFPlan;
use crate::QueryResult;
use super::ast::ExtStatement;
use super::session::SessionCtx;
#[derive(Clone)]
pub enum Plan {
// only support query sql
@@ -27,3 +33,8 @@ impl QueryPlan {
matches!(self.df_plan, DFPlan::Explain(_) | DFPlan::Analyze(_))
}
}
#[async_trait]
pub trait LogicalPlanner {
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan>;
}

View File

@@ -1,15 +1,22 @@
use s3s::dto::SelectObjectContentInput;
pub mod analyzer;
pub mod ast;
pub mod datasource;
pub mod dispatcher;
pub mod execution;
pub mod function;
pub mod logical_planner;
pub mod optimizer;
pub mod parser;
pub mod physical_planner;
pub mod scheduler;
pub mod session;
#[derive(Clone)]
pub struct Context {
// maybe we need transfer some info?
pub input: SelectObjectContentInput,
}
#[derive(Clone)]
@@ -17,3 +24,18 @@ pub struct Query {
context: Context,
content: String,
}
impl Query {
#[inline(always)]
pub fn new(context: Context, content: String) -> Self {
Self { context, content }
}
pub fn context(&self) -> &Context {
&self.context
}
pub fn content(&self) -> &str {
self.content.as_str()
}
}

View File

@@ -0,0 +1,15 @@
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::physical_plan::ExecutionPlan;
use super::logical_planner::QueryPlan;
use super::session::SessionCtx;
use crate::QueryResult;
pub type OptimizerRef = Arc<dyn Optimizer + Send + Sync>;
#[async_trait]
pub trait Optimizer {
async fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>>;
}

View File

@@ -0,0 +1,21 @@
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::ExtensionPlanner;
use super::session::SessionCtx;
use crate::QueryResult;
#[async_trait]
pub trait PhysicalPlanner {
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionCtx,
) -> QueryResult<Arc<dyn ExecutionPlan>>;
fn inject_physical_transform_rule(&mut self, rule: Arc<dyn ExtensionPlanner + Send + Sync>);
}

View File

@@ -0,0 +1,32 @@
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
pub type SchedulerRef = Arc<dyn Scheduler + Send + Sync>;
#[async_trait]
pub trait Scheduler {
/// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
///
/// Returns a [`ExecutionResults`] that can be used to receive results as they are produced,
/// as a [`futures::Stream`] of [`RecordBatch`]
async fn schedule(&self, plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> Result<ExecutionResults>;
}
pub struct ExecutionResults {
stream: SendableRecordBatchStream,
}
impl ExecutionResults {
pub fn new(stream: SendableRecordBatchStream) -> Self {
Self { stream }
}
/// Returns a [`SendableRecordBatchStream`] of this execution
pub fn stream(self) -> SendableRecordBatchStream {
self.stream
}
}

View File

@@ -1,10 +1,17 @@
use std::sync::Arc;
use datafusion::execution::context::SessionState;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
prelude::SessionContext,
};
use crate::{object_store::EcObjectStore, QueryError, QueryResult};
use super::Context;
#[derive(Clone)]
pub struct SessionCtx {
desc: Arc<SessionCtxDesc>,
_desc: Arc<SessionCtxDesc>,
inner: SessionState,
}
@@ -18,3 +25,33 @@ impl SessionCtx {
pub struct SessionCtxDesc {
// maybe we need some info
}
#[derive(Default)]
pub struct SessionCtxFactory {}
impl SessionCtxFactory {
pub fn create_session_ctx(&self, context: &Context) -> QueryResult<SessionCtx> {
let df_session_ctx = self.build_df_session_context(context)?;
Ok(SessionCtx {
_desc: Arc::new(SessionCtxDesc {}),
inner: df_session_ctx.state(),
})
}
fn build_df_session_context(&self, context: &Context) -> QueryResult<SessionContext> {
let path = format!("s3://{}", context.input.bucket);
let store_url = url::Url::parse(&path).unwrap();
let store = EcObjectStore::new(context.input.clone()).map_err(|_| QueryError::NotImplemented { err: String::new() })?;
let rt = RuntimeEnvBuilder::new().build()?;
let df_session_state = SessionStateBuilder::new()
.with_runtime_env(Arc::new(rt))
.with_object_store(&store_url, Arc::new(store))
.with_default_features()
.build();
let df_session_ctx = SessionContext::new_with_state(df_session_state);
Ok(df_session_ctx)
}
}

View File

@@ -30,7 +30,6 @@ impl QueryHandle {
#[async_trait]
pub trait DatabaseManagerSystem {
async fn start(&self) -> QueryResult<()>;
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle>;
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef>;
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>>;
@@ -39,5 +38,4 @@ pub trait DatabaseManagerSystem {
logical_plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryHandle>;
fn metrics(&self) -> String;
}

View File

@@ -5,8 +5,13 @@ edition.workspace = true
[dependencies]
api = { path = "../api" }
async-recursion = { workspace = true }
async-trait.workspace = true
datafusion = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
parking_lot = { version = "0.12.1" }
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio = { workspace = true }
tracing = { workspace = true }

View File

@@ -1,4 +1,5 @@
use std::any::Any;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::Arc;
use std::write;
@@ -10,7 +11,6 @@ use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableSource};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use tracing::debug;
@@ -28,11 +28,9 @@ pub struct TableSourceAdapter {
impl TableSourceAdapter {
pub fn try_new(
table_ref: impl Into<TableReference>,
database_name: impl Into<String>,
table_name: impl Into<String>,
table_handle: impl Into<TableHandle>,
) -> Result<Self, DataFusionError> {
let database_name = database_name.into();
let table_name: String = table_name.into();
let table_handle = table_handle.into();
@@ -46,7 +44,7 @@ impl TableSourceAdapter {
TableHandle::TableProvider(t) => {
let table_source = provider_as_source(t.clone());
if let Some(plan) = table_source.get_logical_plan() {
LogicalPlanBuilder::from(plan.clone()).build()?
LogicalPlanBuilder::from(plan.into_owned()).build()?
} else {
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
}
@@ -91,8 +89,8 @@ impl TableSource for TableSourceAdapter {
}
/// Called by [`InlineTableScan`]
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Owned(self.plan.clone()))
}
}

View File

@@ -1,51 +1,62 @@
use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use api::{
query::{
ast::ExtStatement,
dispatcher::QueryDispatcher,
execution::{Output, QueryStateMachine},
function::FuncMetaManagerRef,
logical_planner::Plan,
logical_planner::{LogicalPlanner, Plan},
parser::Parser,
session::{SessionCtx, SessionCtxFactory},
Query,
},
QueryError, QueryResult,
};
use async_trait::async_trait;
use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
config::CsvOptions,
datasource::{
file_format::{csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::Result as DFResult,
execution::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::{Stream, StreamExt};
use s3s::dto::SelectObjectContentInput;
use tokio::task::JoinHandle;
use crate::metadata::TableHandleProviderRef;
use crate::{
execution::factory::QueryExecutionFactoryRef,
metadata::{
base_table::BaseTableProvider,
ContextProviderExtension, MetadataProvider, TableHandleProviderRef,
},
sql::logical::planner::DefaultLogicalPlanner,
};
#[derive(Clone)]
pub struct SimpleQueryDispatcher {
input: SelectObjectContentInput,
// client for default tenant
default_table_provider: TableHandleProviderRef,
// memory pool
// memory_pool: MemoryPoolRef,
// query tracker
session_factory: Arc<SessionCtxFactory>,
// parser
parser: Arc<dyn Parser + Send + Sync>,
// get query execution factory
query_execution_factory: QueryExecutionFactoryRef,
func_manager: FuncMetaManagerRef,
async_task_joinhandle: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
failed_task_joinhandle: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
}
#[async_trait]
impl QueryDispatcher for SimpleQueryDispatcher {
async fn start(&self) -> QueryResult<()> {
self.execute_persister_query(self.coord.node_id()).await
}
fn stop(&self) {
// TODO
}
async fn execute_query(&self, query: &Query) -> QueryResult<Output> {
let query_state_machine = { self.build_query_state_machine(query.clone()).await? };
@@ -66,7 +77,6 @@ impl QueryDispatcher for SimpleQueryDispatcher {
let logical_planner = DefaultLogicalPlanner::new(&scheme_provider);
let span_recorder = session.get_child_span("parse sql");
let statements = self.parser.parse(query.content())?;
// not allow multi statement
@@ -82,8 +92,6 @@ impl QueryDispatcher for SimpleQueryDispatcher {
None => return Ok(None),
};
drop(span_recorder);
let logical_plan = self
.statement_to_logical_plan(stmt, &logical_planner, query_state_machine)
.await?;
@@ -95,9 +103,7 @@ impl QueryDispatcher for SimpleQueryDispatcher {
}
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>> {
let session = self
.session_factory
.create_session_ctx(query.context(), self.memory_pool.clone(), self.coord.clone())?;
let session = self.session_factory.create_session_ctx(query.context())?;
let query_state_machine = Arc::new(QueryStateMachine::begin(query, session));
Ok(query_state_machine)
@@ -114,7 +120,7 @@ impl SimpleQueryDispatcher {
// begin analyze
query_state_machine.begin_analyze();
let logical_plan = logical_planner
.create_logical_plan(stmt, &query_state_machine.session, self.coord.get_config().query.auth_enabled)
.create_logical_plan(stmt, &query_state_machine.session)
.await?;
query_state_machine.end_analyze();
@@ -127,79 +133,85 @@ impl SimpleQueryDispatcher {
.create_query_execution(logical_plan, query_state_machine.clone())
.await?;
// TrackedQuery.drop() is called implicitly when the value goes out of scope,
self.query_tracker
.try_track_query(query_state_machine.query_id, execution)
.await?
.start()
.await
match execution.start().await {
Ok(Output::StreamData(stream)) => Ok(Output::StreamData(Box::pin(TrackedRecordBatchStream { inner: stream }))),
Ok(nil @ Output::Nil(_)) => Ok(nil),
Err(err) => Err(err),
}
}
async fn build_scheme_provider(&self, session: &SessionCtx) -> QueryResult<MetadataProvider> {
let meta_client = self.build_current_session_meta_client(session).await?;
let current_session_table_provider = self.build_table_handle_provider(meta_client.clone())?;
let metadata_provider = MetadataProvider::new(
self.coord.clone(),
meta_client,
current_session_table_provider,
self.default_table_provider.clone(),
self.func_manager.clone(),
self.query_tracker.clone(),
session.clone(),
);
let path = format!("s3://{}/{}", self.input.bucket, self.input.key);
let table_path = ListingTableUrl::parse(path)?;
let listing_options = if self.input.request.input_serialization.csv.is_some() {
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false));
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
} else if self.input.request.input_serialization.parquet.is_some() {
let file_format = ParquetFormat::new();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
} else if self.input.request.input_serialization.json.is_some() {
let file_format = JsonFormat::default();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".json")
} else {
return Err(QueryError::NotImplemented {
err: "not support this file type".to_string(),
});
};
let resolve_schema = listing_options.infer_schema(session.inner(), &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolve_schema);
let provider = Arc::new(ListingTable::try_new(config)?);
let current_session_table_provider = self.build_table_handle_provider()?;
let metadata_provider =
MetadataProvider::new(provider, current_session_table_provider, self.func_manager.clone(), session.clone());
Ok(metadata_provider)
}
async fn build_current_session_meta_client(&self, session: &SessionCtx) -> QueryResult<MetaClientRef> {
let meta_client = self
.coord
.tenant_meta(session.tenant())
.await
.ok_or_else(|| MetaError::TenantNotFound {
tenant: session.tenant().to_string(),
})
.context(MetaSnafu)?;
Ok(meta_client)
}
fn build_table_handle_provider(&self, meta_client: MetaClientRef) -> QueryResult<TableHandleProviderRef> {
let current_session_table_provider: Arc<BaseTableProvider> = Arc::new(BaseTableProvider::new(
self.coord.clone(),
self.split_manager.clone(),
meta_client,
self.stream_provider_manager.clone(),
));
fn build_table_handle_provider(&self) -> QueryResult<TableHandleProviderRef> {
let current_session_table_provider: Arc<BaseTableProvider> = Arc::new(BaseTableProvider::default());
Ok(current_session_table_provider)
}
}
pub struct TrackedRecordBatchStream {
inner: SendableRecordBatchStream,
}
impl RecordBatchStream for TrackedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}
impl Stream for TrackedRecordBatchStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
#[derive(Default, Clone)]
pub struct SimpleQueryDispatcherBuilder {
coord: Option<CoordinatorRef>,
input: Option<SelectObjectContentInput>,
default_table_provider: Option<TableHandleProviderRef>,
split_manager: Option<SplitManagerRef>,
session_factory: Option<Arc<SessionCtxFactory>>,
parser: Option<Arc<dyn Parser + Send + Sync>>,
query_execution_factory: Option<QueryExecutionFactoryRef>,
query_tracker: Option<Arc<QueryTracker>>,
memory_pool: Option<MemoryPoolRef>, // memory
func_manager: Option<FuncMetaManagerRef>,
stream_provider_manager: Option<StreamProviderManagerRef>,
span_ctx: Option<SpanContext>,
auth_cache: Option<Arc<AuthCache<AuthCacheKey, User>>>,
}
impl SimpleQueryDispatcherBuilder {
pub fn with_coord(mut self, coord: CoordinatorRef) -> Self {
self.coord = Some(coord);
pub fn with_input(mut self, input: SelectObjectContentInput) -> Self {
self.input = Some(input);
self
}
pub fn with_default_table_provider(mut self, default_table_provider: TableHandleProviderRef) -> Self {
self.default_table_provider = Some(default_table_provider);
self
@@ -210,11 +222,6 @@ impl SimpleQueryDispatcherBuilder {
self
}
pub fn with_split_manager(mut self, split_manager: SplitManagerRef) -> Self {
self.split_manager = Some(split_manager);
self
}
pub fn with_parser(mut self, parser: Arc<dyn Parser + Send + Sync>) -> Self {
self.parser = Some(parser);
self
@@ -225,43 +232,14 @@ impl SimpleQueryDispatcherBuilder {
self
}
pub fn with_query_tracker(mut self, query_tracker: Arc<QueryTracker>) -> Self {
self.query_tracker = Some(query_tracker);
self
}
pub fn with_memory_pool(mut self, memory_pool: MemoryPoolRef) -> Self {
self.memory_pool = Some(memory_pool);
self
}
pub fn with_func_manager(mut self, func_manager: FuncMetaManagerRef) -> Self {
self.func_manager = Some(func_manager);
self
}
pub fn with_stream_provider_manager(mut self, stream_provider_manager: StreamProviderManagerRef) -> Self {
self.stream_provider_manager = Some(stream_provider_manager);
self
}
pub fn with_span_ctx(mut self, span_ctx: Option<SpanContext>) -> Self {
self.span_ctx = span_ctx;
self
}
pub fn with_auth_cache(mut self, auth_cache: Arc<AuthCache<AuthCacheKey, User>>) -> Self {
self.auth_cache = Some(auth_cache);
self
}
pub fn build(self) -> QueryResult<Arc<SimpleQueryDispatcher>> {
let coord = self.coord.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of coord".to_string(),
})?;
let split_manager = self.split_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of split manager".to_string(),
let input = self.input.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of input".to_string(),
})?;
let session_factory = self.session_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
@@ -276,56 +254,23 @@ impl SimpleQueryDispatcherBuilder {
err: "lost of query_execution_factory".to_string(),
})?;
let query_tracker = self.query_tracker.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of query_tracker".to_string(),
})?;
let func_manager = self.func_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of func_manager".to_string(),
})?;
let stream_provider_manager = self.stream_provider_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of stream_provider_manager".to_string(),
})?;
let memory_pool = self.memory_pool.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of memory pool".to_string(),
})?;
let default_table_provider = self.default_table_provider.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of default_table_provider".to_string(),
})?;
let span_ctx = self.span_ctx;
let auth_cache = self.auth_cache.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of auth_cache".to_string(),
})?;
let dispatcher = Arc::new(SimpleQueryDispatcher {
coord,
input,
default_table_provider,
split_manager,
session_factory,
memory_pool,
parser,
query_execution_factory,
query_tracker,
func_manager,
stream_provider_manager,
span_ctx,
async_task_joinhandle: Arc::new(Mutex::new(HashMap::new())),
failed_task_joinhandle: Arc::new(Mutex::new(HashMap::new())),
auth_cache,
});
let meta_task_receiver = dispatcher
.coord
.meta_manager()
.take_resourceinfo_rx()
.expect("meta resource channel only has one consumer");
tokio::spawn(SimpleQueryDispatcher::recv_meta_modify(dispatcher.clone(), meta_task_receiver));
Ok(dispatcher)
}
}

View File

@@ -1,42 +1,29 @@
use std::sync::Arc;
use api::query::execution::QueryExecutionFactory;
use api::{
query::{
execution::{QueryExecutionFactory, QueryExecutionRef, QueryStateMachineRef},
logical_planner::Plan,
optimizer::Optimizer,
scheduler::SchedulerRef,
},
QueryError,
};
use async_trait::async_trait;
use super::query::SqlQueryExecution;
pub type QueryExecutionFactoryRef = Arc<dyn QueryExecutionFactory + Send + Sync>;
pub struct SqlQueryExecutionFactory {
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
query_tracker: Arc<QueryTracker>,
trigger_executor_factory: TriggerExecutorFactoryRef,
runtime: Arc<DedicatedExecutor>,
stream_checker_manager: StreamCheckerManagerRef,
}
impl SqlQueryExecutionFactory {
#[inline(always)]
pub fn new(
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
query_tracker: Arc<QueryTracker>,
stream_checker_manager: StreamCheckerManagerRef,
config: Arc<QueryOptions>,
) -> Self {
// Only do periodic scheduling, no need for many threads
let trigger_executor_runtime = DedicatedExecutor::new("stream-trigger", config.stream_trigger_cpu);
let trigger_executor_factory = Arc::new(TriggerExecutorFactory::new(Arc::new(trigger_executor_runtime)));
// perform stream-related preparations, not actual operator execution
let runtime = Arc::new(DedicatedExecutor::new("stream-executor", config.stream_executor_cpu));
Self {
optimizer,
scheduler,
query_tracker,
trigger_executor_factory,
runtime,
stream_checker_manager,
}
pub fn new(optimizer: Arc<dyn Optimizer + Send + Sync>, scheduler: SchedulerRef) -> Self {
Self { optimizer, scheduler }
}
}
@@ -48,62 +35,12 @@ impl QueryExecutionFactory for SqlQueryExecutionFactory {
state_machine: QueryStateMachineRef,
) -> Result<QueryExecutionRef, QueryError> {
match plan {
Plan::Query(query_plan) => {
// 获取执行计划中所有涉及到的stream source
let stream_providers = extract_stream_providers(&query_plan);
// (含有流表, explain, dml)
match (!stream_providers.is_empty(), query_plan.is_explain(), is_dml(&query_plan)) {
(false, _, _) | (true, true, _) => Ok(Arc::new(SqlQueryExecution::new(
state_machine,
query_plan,
self.optimizer.clone(),
self.scheduler.clone(),
))),
(true, false, true) => {
// 流操作
// stream source + dml + !explain
let options = state_machine.session.inner().config().into();
let exec = MicroBatchStreamExecutionBuilder::new(MicroBatchStreamExecutionDesc {
plan: Arc::new(query_plan),
options,
})
.with_stream_providers(stream_providers)
.build(
state_machine,
self.scheduler.clone(),
self.trigger_executor_factory.clone(),
self.runtime.clone(),
)
.await?;
Ok(Arc::new(exec))
}
(true, false, false) => {
// stream source + !dml + !explain
Err(QueryError::NotImplemented {
err: "Stream table can only be used as source table in insert select statements.".to_string(),
})
}
}
}
Plan::DDL(ddl_plan) => Ok(Arc::new(DDLExecution::new(state_machine, self.stream_checker_manager.clone(), ddl_plan))),
Plan::DML(dml_plan) => Ok(Arc::new(DMLExecution::new(state_machine, dml_plan))),
Plan::SYSTEM(sys_plan) => Ok(Arc::new(SystemExecution::new(state_machine, sys_plan, self.query_tracker.clone()))),
Plan::Query(query_plan) => Ok(Arc::new(SqlQueryExecution::new(
state_machine,
query_plan,
self.optimizer.clone(),
self.scheduler.clone(),
))),
}
}
}
fn is_dml(query_plan: &QueryPlan) -> bool {
match &query_plan.df_plan {
LogicalPlan::Dml(_) => true,
LogicalPlan::Extension(Extension { node }) => downcast_plan_node::<TableWriterMergePlanNode>(node.as_ref()).is_some(),
_ => false,
}
}
impl Drop for SqlQueryExecutionFactory {
fn drop(&mut self) {
self.runtime.shutdown();
}
}

View File

@@ -1 +1,3 @@
pub mod factory;
pub mod query;
pub mod scheduler;

View File

@@ -0,0 +1,92 @@
use std::sync::Arc;
use api::query::execution::{Output, QueryExecution, QueryStateMachineRef};
use api::query::logical_planner::QueryPlan;
use api::query::optimizer::Optimizer;
use api::query::scheduler::SchedulerRef;
use api::{QueryError, QueryResult};
use async_trait::async_trait;
use futures::stream::AbortHandle;
use parking_lot::Mutex;
use tracing::debug;
pub struct SqlQueryExecution {
query_state_machine: QueryStateMachineRef,
plan: QueryPlan,
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
abort_handle: Mutex<Option<AbortHandle>>,
}
impl SqlQueryExecution {
pub fn new(
query_state_machine: QueryStateMachineRef,
plan: QueryPlan,
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
) -> Self {
Self {
query_state_machine,
plan,
optimizer,
scheduler,
abort_handle: Mutex::new(None),
}
}
async fn start(&self) -> QueryResult<Output> {
// begin optimize
self.query_state_machine.begin_optimize();
let physical_plan = self.optimizer.optimize(&self.plan, &self.query_state_machine.session).await?;
self.query_state_machine.end_optimize();
// begin schedule
self.query_state_machine.begin_schedule();
let stream = self
.scheduler
.schedule(physical_plan.clone(), self.query_state_machine.session.inner().task_ctx())
.await?
.stream();
debug!("Success build result stream.");
self.query_state_machine.end_schedule();
Ok(Output::StreamData(stream))
}
}
#[async_trait]
impl QueryExecution for SqlQueryExecution {
async fn start(&self) -> QueryResult<Output> {
let (task, abort_handle) = futures::future::abortable(self.start());
{
*self.abort_handle.lock() = Some(abort_handle);
}
task.await.map_err(|_| QueryError::Cancel)?
}
fn cancel(&self) -> QueryResult<()> {
debug!(
"cancel sql query execution: sql: {}, state: {:?}",
self.query_state_machine.query.content(),
self.query_state_machine.state()
);
// change state
self.query_state_machine.cancel();
// stop future task
if let Some(e) = self.abort_handle.lock().as_ref() {
e.abort()
};
debug!(
"canceled sql query execution: sql: {}, state: {:?}",
self.query_state_machine.query.content(),
self.query_state_machine.state()
);
Ok(())
}
}

View File

@@ -0,0 +1,22 @@
use std::sync::Arc;
use api::query::scheduler::{ExecutionResults, Scheduler};
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{execute_stream, ExecutionPlan};
pub struct LocalScheduler {}
#[async_trait]
impl Scheduler for LocalScheduler {
async fn schedule(
&self,
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<ExecutionResults, DataFusionError> {
let stream = execute_stream(plan, context)?;
Ok(ExecutionResults::new(stream))
}
}

View File

@@ -0,0 +1 @@
pub mod local;

View File

@@ -1,12 +1,22 @@
use std::sync::Arc;
use api::{
query::{dispatcher::QueryDispatcher, execution::QueryStateMachineRef, logical_planner::Plan, Query},
query::{
dispatcher::QueryDispatcher, execution::QueryStateMachineRef, logical_planner::Plan, session::SessionCtxFactory, Query,
},
server::dbms::{DatabaseManagerSystem, QueryHandle},
QueryResult,
};
use async_trait::async_trait;
use derive_builder::Builder;
use s3s::dto::SelectObjectContentInput;
use crate::{
dispatcher::manager::SimpleQueryDispatcherBuilder,
execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler},
metadata::base_table::BaseTableProvider,
sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser},
};
#[derive(Builder)]
pub struct RustFSms<D: QueryDispatcher> {
@@ -19,10 +29,6 @@ impl<D> DatabaseManagerSystem for RustFSms<D>
where
D: QueryDispatcher,
{
async fn start(&self) -> QueryResult<()> {
self.query_dispatcher.start().await
}
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle> {
let result = self.query_dispatcher.execute_query(query).await?;
@@ -54,15 +60,31 @@ where
Ok(QueryHandle::new(query.clone(), result))
}
fn metrics(&self) -> String {
let infos = self.query_dispatcher.running_query_infos();
let status = self.query_dispatcher.running_query_status();
format!(
"infos: {}\nstatus: {}\n",
infos.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>().join(","),
status.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>().join(",")
)
}
}
pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult<impl DatabaseManagerSystem> {
// TODO session config need load global system config
let session_factory = Arc::new(SessionCtxFactory {});
let parser = Arc::new(DefaultParser::default());
let optimizer = Arc::new(CascadeOptimizerBuilder::default().build());
// TODO wrap, and num_threads configurable
let scheduler = Arc::new(LocalScheduler {});
let query_execution_factory = Arc::new(SqlQueryExecutionFactory::new(optimizer, scheduler));
let default_table_provider = Arc::new(BaseTableProvider::default());
let query_dispatcher = SimpleQueryDispatcherBuilder::default()
.with_input(input)
.with_default_table_provider(default_table_provider)
.with_session_factory(session_factory)
.with_parser(parser)
.with_query_execution_factory(query_execution_factory)
.build()?;
let mut builder = RustFSmsBuilder::default();
let db_server = builder.query_dispatcher(query_dispatcher).build().expect("build db server");
Ok(db_server)
}

View File

@@ -1,63 +1,17 @@
use std::sync::Arc;
use datafusion::common::Result as DFResult;
use datafusion::config::{CsvOptions, JsonOptions};
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::error::DataFusionError;
use datafusion::execution::SessionState;
use datafusion::datasource::listing::ListingTable;
use crate::data_source::data_source::TableHandle;
use super::TableHandleProvider;
pub enum FileType {
Csv,
Parquet,
Json,
Unknown,
}
#[derive(Default)]
pub struct BaseTableProvider {
file_type: FileType,
}
impl BaseTableProvider {
pub fn new(file_type: FileType) -> Self {
Self { file_type }
}
}
pub struct BaseTableProvider {}
impl TableHandleProvider for BaseTableProvider {
async fn build_table_handle(&self, session_state: &SessionState, table_name: &str) -> DFResult<TableHandle> {
let table_path = ListingTableUrl::parse(table_name)?;
let listing_options = match self.file_type {
FileType::Csv => {
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false));
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
}
FileType::Parquet => {
let file_format = ParquetFormat::new();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
}
FileType::Json => {
let file_format = JsonFormat::default();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".json")
}
FileType::Unknown => {
return Err(DataFusionError::NotImplemented("not support this file type".to_string()));
}
};
let resolve_schema = listing_options.infer_schema(session_state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolve_schema);
let provider = Arc::new(ListingTable::try_new(config)?);
fn build_table_handle(&self, provider: Arc<ListingTable>) -> DFResult<TableHandle> {
Ok(TableHandle::External(provider))
}
}

View File

@@ -5,10 +5,12 @@ use api::ResolvedTable;
use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionState;
use datafusion::logical_expr::var_provider::is_system_variables;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion::sql::ResolvedTableReference;
use datafusion::variable::VarType;
use datafusion::{
config::ConfigOptions,
@@ -21,16 +23,17 @@ pub mod base_table;
#[async_trait]
pub trait ContextProviderExtension: ContextProvider {
fn get_table_source(&self, name: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>>;
fn get_table_source_(&self, name: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>>;
}
pub type TableHandleProviderRef = Arc<dyn TableHandleProvider + Send + Sync>;
pub trait TableHandleProvider {
fn build_table_handle(&self, session_state: &SessionState, table_name: &str) -> DFResult<TableHandle>;
fn build_table_handle(&self, provider: Arc<ListingTable>) -> DFResult<TableHandle>;
}
pub struct MetadataProvider {
provider: Arc<ListingTable>,
session: SessionCtx,
config_options: ConfigOptions,
func_manager: FuncMetaManagerRef,
@@ -40,11 +43,13 @@ pub struct MetadataProvider {
impl MetadataProvider {
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Arc<ListingTable>,
current_session_table_provider: TableHandleProviderRef,
func_manager: FuncMetaManagerRef,
session: SessionCtx,
) -> Self {
Self {
provider,
current_session_table_provider,
config_options: session.inner().config_options().clone(),
session,
@@ -52,43 +57,22 @@ impl MetadataProvider {
}
}
fn build_table_handle(&self, name: &ResolvedTable) -> datafusion::common::Result<TableHandle> {
let table_name = name.table();
self.current_session_table_provider.build_table_handle(table_name)
fn build_table_handle(&self) -> datafusion::common::Result<TableHandle> {
self.current_session_table_provider
.build_table_handle(self.provider.clone())
}
async fn init(&self) {}
}
#[async_trait::async_trait]
impl ContextProviderExtension for MetadataProvider {
fn get_table_source(&self, table_ref: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>> {
let name = table_ref
.clone()
.resolve_object(self.session.tenant(), self.session.default_database())?;
fn get_table_source_(&self, table_ref: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>> {
let name = table_ref.clone().resolve("", "");
let table_name = &*name.table;
let table_name = name.table();
let database_name = name.database();
let tenant_name = name.tenant();
let table_handle = self.build_table_handle()?;
// Cannot query across tenants
if self.session.tenant() != tenant_name {
return Err(DataFusionError::Plan(format!(
"Tenant conflict, the current connection's tenant is {}",
self.session.tenant()
)));
}
// save access table
self.access_databases.write().push_table(database_name, table_name);
let table_handle = self.build_table_handle(&name)?;
Ok(Arc::new(TableSourceAdapter::try_new(
table_ref.to_owned_reference(),
database_name,
table_name,
table_handle,
)?))
Ok(Arc::new(TableSourceAdapter::try_new(table_ref.clone(), table_name, table_handle)?))
}
}
@@ -132,7 +116,7 @@ impl ContextProvider for MetadataProvider {
}
fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
Ok(self.get_table_source(name)?)
Ok(self.get_table_source_(name)?)
}
fn udf_names(&self) -> Vec<String> {

View File

@@ -0,0 +1,33 @@
use api::query::analyzer::Analyzer;
use api::query::session::SessionCtx;
use api::QueryResult;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::analyzer::Analyzer as DFAnalyzer;
pub struct DefaultAnalyzer {
inner: DFAnalyzer,
}
impl DefaultAnalyzer {
pub fn new() -> Self {
let analyzer = DFAnalyzer::default();
// we can add analyzer rule at here
Self { inner: analyzer }
}
}
impl Default for DefaultAnalyzer {
fn default() -> Self {
Self::new()
}
}
impl Analyzer for DefaultAnalyzer {
fn analyze(&self, plan: &LogicalPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
let plan = self
.inner
.execute_and_check(plan.to_owned(), session.inner().config_options(), |_, _| {})?;
Ok(plan)
}
}

View File

@@ -0,0 +1,18 @@
use datafusion::sql::sqlparser::dialect::Dialect;
#[derive(Debug, Default)]
pub struct RustFsDialect;
impl Dialect for RustFsDialect {
fn is_identifier_start(&self, ch: char) -> bool {
ch.is_alphabetic() || ch == '_' || ch == '#' || ch == '@'
}
fn is_identifier_part(&self, ch: char) -> bool {
ch.is_alphabetic() || ch.is_ascii_digit() || ch == '@' || ch == '$' || ch == '#' || ch == '_'
}
fn supports_group_by_expr(&self) -> bool {
true
}
}

View File

@@ -1 +1,2 @@
pub mod optimizer;
pub mod planner;

View File

@@ -0,0 +1,115 @@
use std::sync::Arc;
use api::{
query::{analyzer::AnalyzerRef, logical_planner::QueryPlan, session::SessionCtx},
QueryResult,
};
use datafusion::{
execution::SessionStateBuilder,
logical_expr::LogicalPlan,
optimizer::{
common_subexpr_eliminate::CommonSubexprEliminate, decorrelate_predicate_subquery::DecorrelatePredicateSubquery,
eliminate_cross_join::EliminateCrossJoin, eliminate_duplicated_expr::EliminateDuplicatedExpr,
eliminate_filter::EliminateFilter, eliminate_join::EliminateJoin, eliminate_limit::EliminateLimit,
eliminate_outer_join::EliminateOuterJoin, extract_equijoin_predicate::ExtractEquijoinPredicate,
filter_null_join_keys::FilterNullJoinKeys, propagate_empty_relation::PropagateEmptyRelation,
push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
replace_distinct_aggregate::ReplaceDistinctWithAggregate, scalar_subquery_to_join::ScalarSubqueryToJoin,
simplify_expressions::SimplifyExpressions, single_distinct_to_groupby::SingleDistinctToGroupBy,
unwrap_cast_in_comparison::UnwrapCastInComparison, OptimizerRule,
},
};
use tracing::debug;
use crate::sql::analyzer::DefaultAnalyzer;
const PUSH_DOWN_PROJECTION_INDEX: usize = 24;
pub trait LogicalOptimizer: Send + Sync {
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>);
}
pub struct DefaultLogicalOptimizer {
// fit datafusion
// TODO refactor
analyzer: AnalyzerRef,
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
impl DefaultLogicalOptimizer {
#[allow(dead_code)]
fn with_optimizer_rules(mut self, rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
self.rules = rules;
self
}
}
impl Default for DefaultLogicalOptimizer {
fn default() -> Self {
let analyzer = Arc::new(DefaultAnalyzer::default());
// additional optimizer rule
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
// df default rules start
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
Arc::new(ScalarSubqueryToJoin::new()),
Arc::new(ExtractEquijoinPredicate::new()),
// simplify expressions does not simplify expressions in subqueries, so we
// run it again after running the optimizations that potentially converted
// subqueries to joins
Arc::new(SimplifyExpressions::new()),
Arc::new(EliminateDuplicatedExpr::new()),
Arc::new(EliminateFilter::new()),
Arc::new(EliminateCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(PropagateEmptyRelation::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(EliminateOuterJoin::new()),
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
Arc::new(PushDownLimit::new()),
Arc::new(PushDownFilter::new()),
Arc::new(SingleDistinctToGroupBy::new()),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
// PushDownProjection can pushdown Projections through Limits, do PushDownLimit again.
Arc::new(PushDownLimit::new()),
// df default rules end
// custom rules can add at here
];
Self { analyzer, rules }
}
}
impl LogicalOptimizer for DefaultLogicalOptimizer {
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session).map(|p| p).map_err(|e| e)? };
debug!("Analyzed logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
let optimizeed_plan = {
SessionStateBuilder::new_from_existing(session.inner().clone())
.with_optimizer_rules(self.rules.clone())
.build()
.optimize(&analyzed_plan)
.map(|p| p)
.map_err(|e| e)?
};
Ok(optimizeed_plan)
}
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>) {
self.rules.push(optimizer_rule);
}
}

View File

@@ -1,3 +1,7 @@
pub mod analyzer;
pub mod dialect;
pub mod logical;
pub mod optimizer;
pub mod parser;
pub mod physical;
pub mod planner;

View File

@@ -0,0 +1,89 @@
use std::sync::Arc;
use api::{
query::{logical_planner::QueryPlan, optimizer::Optimizer, physical_planner::PhysicalPlanner, session::SessionCtx},
QueryResult,
};
use async_trait::async_trait;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use tracing::debug;
use super::{
logical::optimizer::{DefaultLogicalOptimizer, LogicalOptimizer},
physical::{optimizer::PhysicalOptimizer, planner::DefaultPhysicalPlanner},
};
pub struct CascadeOptimizer {
logical_optimizer: Arc<dyn LogicalOptimizer + Send + Sync>,
physical_planner: Arc<dyn PhysicalPlanner + Send + Sync>,
physical_optimizer: Arc<dyn PhysicalOptimizer + Send + Sync>,
}
#[async_trait]
impl Optimizer for CascadeOptimizer {
async fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>> {
debug!("Original logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
let optimized_logical_plan = self.logical_optimizer.optimize(plan, session)?;
debug!("Final logical plan:\n{}\n", optimized_logical_plan.display_indent_schema(),);
let physical_plan = {
self.physical_planner
.create_physical_plan(&optimized_logical_plan, session)
.await
.map(|p| p)
.map_err(|err| err)?
};
debug!("Original physical plan:\n{}\n", displayable(physical_plan.as_ref()).indent(false));
let optimized_physical_plan = {
self.physical_optimizer
.optimize(physical_plan, session)
.map(|p| p)
.map_err(|err| err)?
};
Ok(optimized_physical_plan)
}
}
#[derive(Default)]
pub struct CascadeOptimizerBuilder {
logical_optimizer: Option<Arc<dyn LogicalOptimizer + Send + Sync>>,
physical_planner: Option<Arc<dyn PhysicalPlanner + Send + Sync>>,
physical_optimizer: Option<Arc<dyn PhysicalOptimizer + Send + Sync>>,
}
impl CascadeOptimizerBuilder {
pub fn with_logical_optimizer(mut self, logical_optimizer: Arc<dyn LogicalOptimizer + Send + Sync>) -> Self {
self.logical_optimizer = Some(logical_optimizer);
self
}
pub fn with_physical_planner(mut self, physical_planner: Arc<dyn PhysicalPlanner + Send + Sync>) -> Self {
self.physical_planner = Some(physical_planner);
self
}
pub fn with_physical_optimizer(mut self, physical_optimizer: Arc<dyn PhysicalOptimizer + Send + Sync>) -> Self {
self.physical_optimizer = Some(physical_optimizer);
self
}
pub fn build(self) -> CascadeOptimizer {
let default_logical_optimizer = Arc::new(DefaultLogicalOptimizer::default());
let default_physical_planner = Arc::new(DefaultPhysicalPlanner::default());
let logical_optimizer = self.logical_optimizer.unwrap_or(default_logical_optimizer);
let physical_planner = self.physical_planner.unwrap_or_else(|| default_physical_planner.clone());
let physical_optimizer = self.physical_optimizer.unwrap_or(default_physical_planner);
CascadeOptimizer {
logical_optimizer,
physical_planner,
physical_optimizer,
}
}
}

View File

@@ -0,0 +1,97 @@
use std::{collections::VecDeque, fmt::Display};
use api::{
query::{ast::ExtStatement, parser::Parser as RustFsParser},
ParserSnafu,
};
use datafusion::sql::sqlparser::{
dialect::Dialect,
parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
};
use snafu::ResultExt;
use super::dialect::RustFsDialect;
pub type Result<T, E = ParserError> = std::result::Result<T, E>;
// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
($MSG:expr) => {
Err(ParserError::ParserError($MSG.to_string()))
};
}
#[derive(Default)]
pub struct DefaultParser {}
impl RustFsParser for DefaultParser {
fn parse(&self, sql: &str) -> api::QueryResult<VecDeque<ExtStatement>> {
ExtParser::parse_sql(sql).context(ParserSnafu)
}
}
/// SQL Parser
pub struct ExtParser<'a> {
parser: Parser<'a>,
}
impl<'a> ExtParser<'a> {
/// Parse the specified tokens with dialect
fn new_with_dialect(sql: &str, dialect: &'a dyn Dialect) -> Result<Self> {
let mut tokenizer = Tokenizer::new(dialect, sql);
let tokens = tokenizer.tokenize()?;
Ok(ExtParser {
parser: Parser::new(dialect).with_tokens(tokens),
})
}
/// Parse a SQL statement and produce a set of statements
pub fn parse_sql(sql: &str) -> Result<VecDeque<ExtStatement>> {
let dialect = &RustFsDialect {};
ExtParser::parse_sql_with_dialect(sql, dialect)
}
/// Parse a SQL statement and produce a set of statements
pub fn parse_sql_with_dialect(sql: &str, dialect: &dyn Dialect) -> Result<VecDeque<ExtStatement>> {
let mut parser = ExtParser::new_with_dialect(sql, dialect)?;
let mut stmts = VecDeque::new();
let mut expecting_statement_delimiter = false;
loop {
// ignore empty statements (between successive statement delimiters)
while parser.parser.consume_token(&Token::SemiColon) {
expecting_statement_delimiter = false;
}
if parser.parser.peek_token() == Token::EOF {
break;
}
if expecting_statement_delimiter {
return parser.expected("end of statement", parser.parser.peek_token());
}
let statement = parser.parse_statement()?;
stmts.push_back(statement);
expecting_statement_delimiter = true;
}
// debug!("Parser sql: {}, stmts: {:#?}", sql, stmts);
Ok(stmts)
}
/// Parse a new expression
fn parse_statement(&mut self) -> Result<ExtStatement> {
match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
_ => Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))),
},
_ => Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))),
}
}
// Report unexpected token
fn expected<T>(&self, expected: &str, found: impl Display) -> Result<T> {
parser_err!(format!("Expected {}, found: {}", expected, found))
}
}

View File

@@ -1 +1,2 @@
pub mod optimizer;
pub mod planner;

View File

@@ -0,0 +1,12 @@
use std::sync::Arc;
use api::query::session::SessionCtx;
use api::QueryResult;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
pub trait PhysicalOptimizer {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>>;
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>);
}

View File

@@ -0,0 +1,104 @@
use std::sync::Arc;
use api::query::physical_planner::PhysicalPlanner;
use api::query::session::SessionCtx;
use api::QueryResult;
use async_trait::async_trait;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{
DefaultPhysicalPlanner as DFDefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner as DFPhysicalPlanner,
};
use super::optimizer::PhysicalOptimizer;
pub struct DefaultPhysicalPlanner {
ext_physical_transform_rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
/// Responsible for optimizing a physical execution plan
ext_physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
}
impl DefaultPhysicalPlanner {
#[allow(dead_code)]
fn with_physical_transform_rules(mut self, rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>) -> Self {
self.ext_physical_transform_rules = rules;
self
}
}
impl DefaultPhysicalPlanner {
#[allow(dead_code)]
fn with_optimizer_rules(mut self, rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
self.ext_physical_optimizer_rules = rules;
self
}
}
impl Default for DefaultPhysicalPlanner {
fn default() -> Self {
let ext_physical_transform_rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
// can add rules at here
];
// We need to take care of the rule ordering. They may influence each other.
let ext_physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
];
Self {
ext_physical_transform_rules,
ext_physical_optimizer_rules,
}
}
}
#[async_trait]
impl PhysicalPlanner for DefaultPhysicalPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session: &SessionCtx,
) -> QueryResult<Arc<dyn ExecutionPlan>> {
// 将扩展的物理计划优化规则注入df 的 session state
let new_state = SessionStateBuilder::new_from_existing(session.inner().clone())
.with_physical_optimizer_rules(self.ext_physical_optimizer_rules.clone())
.build();
// 通过扩展的物理计划转换规则构造df 的 Physical Planner
let planner = DFDefaultPhysicalPlanner::with_extension_planners(self.ext_physical_transform_rules.clone());
// 执行df的物理计划规划及优化
planner
.create_physical_plan(logical_plan, &new_state)
.await
.map_err(|e| e.into())
}
fn inject_physical_transform_rule(&mut self, rule: Arc<dyn ExtensionPlanner + Send + Sync>) {
self.ext_physical_transform_rules.push(rule)
}
}
impl PhysicalOptimizer for DefaultPhysicalPlanner {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, _session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>) {
self.ext_physical_optimizer_rules.push(optimizer_rule);
}
}

View File

@@ -1,4 +1,14 @@
use datafusion::sql::planner::SqlToRel;
use api::{
query::{
ast::ExtStatement,
logical_planner::{LogicalPlanner, Plan, QueryPlan},
session::SessionCtx,
},
QueryError, QueryResult,
};
use async_recursion::async_recursion;
use async_trait::async_trait;
use datafusion::sql::{planner::SqlToRel, sqlparser::ast::Statement};
use crate::metadata::ContextProviderExtension;
@@ -6,3 +16,45 @@ pub struct SqlPlanner<'a, S: ContextProviderExtension> {
schema_provider: &'a S,
df_planner: SqlToRel<'a, S>,
}
#[async_trait]
impl<'a, S: ContextProviderExtension + Send + Sync> LogicalPlanner for SqlPlanner<'a, S> {
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
let plan = { self.statement_to_plan(statement, session).await.map_err(|err| err)? };
Ok(plan)
}
}
impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> {
/// Create a new query planner
pub fn new(schema_provider: &'a S) -> Self {
SqlPlanner {
schema_provider,
df_planner: SqlToRel::new(schema_provider),
}
}
/// Generate a logical plan from an Extent SQL statement
#[async_recursion]
pub(crate) async fn statement_to_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
match statement {
ExtStatement::SqlStatement(stmt) => self.df_sql_to_plan(*stmt, session).await,
}
}
async fn df_sql_to_plan(&self, stmt: Statement, _session: &SessionCtx) -> QueryResult<Plan> {
match stmt {
Statement::Query(_) => {
let df_plan = self.df_planner.sql_statement_to_plan(stmt)?;
let plan = Plan::Query(QueryPlan {
df_plan,
is_tag_scan: false,
});
Ok(plan)
}
_ => Err(QueryError::NotImplemented { err: stmt.to_string() }),
}
}
}