Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-03-12 13:11:54 +00:00
parent 31697a55b6
commit 63ef986bac
33 changed files with 3660 additions and 3167 deletions

1376
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,6 +11,8 @@ members = [
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"s3select/api",
"s3select/query",
]
resolver = "2"
@@ -33,8 +35,10 @@ async-trait = "0.1.86"
backon = "1.3.0"
bytes = "1.9.0"
bytesize = "1.3.0"
chrono = { version = "0.4.40", features = ["serde"] }
chrono = { version = "0.4.39", features = ["serde"] }
clap = { version = "4.5.31", features = ["derive", "env"] }
datafusion = "46.0.0"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
ecstore = { path = "./ecstore" }
@@ -112,7 +116,7 @@ md-5 = "0.10.6"
workers = { path = "./common/workers" }
test-case = "3.3.1"
zip = "2.2.3"
snafu = "0.8.5"
[profile.wasm-dev]

View File

@@ -1,10 +1,9 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -12,112 +11,114 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::mem;
use core::cmp::Ordering;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

10
s3select/api/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "api"
version.workspace = true
edition.workspace = true
[dependencies]
async-trait.workspace = true
datafusion = { workspace = true }
futures = { workspace = true }
snafu = { workspace = true, features = ["backtrace"] }

52
s3select/api/src/lib.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::fmt::Display;
use datafusion::common::DataFusionError;
use snafu::{Backtrace, Location, Snafu};
pub mod query;
pub mod server;
pub type QueryResult<T> = Result<T, QueryError>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum QueryError {
Datafusion {
source: DataFusionError,
location: Location,
backtrace: Backtrace,
},
}
impl From<DataFusionError> for QueryError {
fn from(value: DataFusionError) -> Self {
match value {
DataFusionError::External(e) if e.downcast_ref::<QueryError>().is_some() => *e.downcast::<QueryError>().unwrap(),
v => Self::Datafusion {
source: v,
location: Default::default(),
backtrace: Backtrace::capture(),
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedTable {
// path
table: String,
}
impl ResolvedTable {
pub fn table(&self) -> &str {
&self.table
}
}
impl Display for ResolvedTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { table } = self;
write!(f, "{table}")
}
}

View File

@@ -0,0 +1,8 @@
use datafusion::sql::sqlparser::ast::Statement;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExtStatement {
/// ANSI SQL AST node
SqlStatement(Box<Statement>),
// we can expand command
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,36 @@
use std::sync::Arc;
use async_trait::async_trait;
use crate::QueryResult;
use super::{
execution::{Output, QueryStateMachine},
logical_planner::Plan,
Query,
};
#[async_trait]
pub trait QueryDispatcher: Send + Sync {
async fn start(&self) -> QueryResult<()>;
fn stop(&self);
// fn create_query_id(&self) -> QueryId;
// fn query_info(&self, id: &QueryId);
async fn execute_query(&self, query: &Query) -> QueryResult<Output>;
async fn build_logical_plan(&self, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Option<Plan>>;
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output>;
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>>;
// fn running_query_infos(&self) -> Vec<QueryInfo>;
// fn running_query_status(&self) -> Vec<QueryStatus>;
// fn cancel_query(&self, id: &QueryId);
}

View File

@@ -0,0 +1,253 @@
use std::fmt::Display;
use std::pin::Pin;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{Stream, StreamExt, TryStreamExt};
use crate::{QueryError, QueryResult};
use super::logical_planner::Plan;
use super::session::SessionCtx;
use super::Query;
pub type QueryExecutionRef = Arc<dyn QueryExecution>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryType {
Batch,
Stream,
}
impl Display for QueryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Batch => write!(f, "batch"),
Self::Stream => write!(f, "stream"),
}
}
}
#[async_trait]
pub trait QueryExecution: Send + Sync {
fn query_type(&self) -> QueryType {
QueryType::Batch
}
// 开始
async fn start(&self) -> QueryResult<Output>;
// 停止
fn cancel(&self) -> QueryResult<()>;
// query状态
// 查询计划
// 静态信息
// fn info(&self) -> QueryInfo;
// 运行时信息
// fn status(&self) -> QueryStatus;
// sql
// 资源占用cpu时间/内存/吞吐量等)
// 是否需要持久化query信息
fn need_persist(&self) -> bool {
false
}
}
pub enum Output {
StreamData(SendableRecordBatchStream),
Nil(()),
}
impl Output {
pub fn schema(&self) -> SchemaRef {
match self {
Self::StreamData(stream) => stream.schema(),
Self::Nil(_) => Arc::new(Schema::empty()),
}
}
pub async fn chunk_result(self) -> QueryResult<Vec<RecordBatch>> {
match self {
Self::Nil(_) => Ok(vec![]),
Self::StreamData(stream) => {
let schema = stream.schema();
let mut res: Vec<RecordBatch> = stream.try_collect::<Vec<RecordBatch>>().await?;
if res.is_empty() {
res.push(RecordBatch::new_empty(schema));
}
Ok(res)
}
}
}
pub async fn num_rows(self) -> usize {
match self.chunk_result().await {
Ok(rb) => rb.iter().map(|e| e.num_rows()).sum(),
Err(_) => 0,
}
}
/// Returns the number of records affected by the query operation
///
/// If it is a select statement, returns the number of rows in the result set
///
/// -1 means unknown
///
/// panic! when StreamData's number of records greater than i64::Max
pub async fn affected_rows(self) -> i64 {
self.num_rows().await as i64
}
}
impl Stream for Output {
type Item = std::result::Result<RecordBatch, QueryError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this {
Output::StreamData(stream) => stream.poll_next_unpin(cx).map_err(|e| e.into()),
Output::Nil(_) => Poll::Ready(None),
}
}
}
#[async_trait]
pub trait QueryExecutionFactory {
async fn create_query_execution(
&self,
plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryExecutionRef>;
}
pub type QueryStateMachineRef = Arc<QueryStateMachine>;
pub struct QueryStateMachine {
pub session: SessionCtx,
pub query: Query,
state: AtomicPtr<QueryState>,
start: Instant,
}
impl QueryStateMachine {
pub fn begin(query: Query, session: SessionCtx) -> Self {
Self {
session,
query,
state: AtomicPtr::new(Box::into_raw(Box::new(QueryState::ACCEPTING))),
start: Instant::now(),
}
}
pub fn begin_analyze(&self) {
// TODO record time
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::ANALYZING)));
}
pub fn end_analyze(&self) {
// TODO record time
}
pub fn begin_optimize(&self) {
// TODO record time
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTMIZING)));
}
pub fn end_optimize(&self) {
// TODO
}
pub fn begin_schedule(&self) {
// TODO
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::SCHEDULING)));
}
pub fn end_schedule(&self) {
// TODO
}
pub fn finish(&self) {
// TODO
self.translate_to(Box::new(QueryState::DONE(DONE::FINISHED)));
}
pub fn cancel(&self) {
// TODO
self.translate_to(Box::new(QueryState::DONE(DONE::CANCELLED)));
}
pub fn fail(&self) {
// TODO
self.translate_to(Box::new(QueryState::DONE(DONE::FAILED)));
}
pub fn state(&self) -> &QueryState {
unsafe { &*self.state.load(Ordering::Relaxed) }
}
pub fn duration(&self) -> Duration {
self.start.elapsed()
}
fn translate_to(&self, state: Box<QueryState>) {
self.state.store(Box::into_raw(state), Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub enum QueryState {
ACCEPTING,
RUNNING(RUNNING),
DONE(DONE),
}
impl AsRef<str> for QueryState {
fn as_ref(&self) -> &str {
match self {
QueryState::ACCEPTING => "ACCEPTING",
QueryState::RUNNING(e) => e.as_ref(),
QueryState::DONE(e) => e.as_ref(),
}
}
}
#[derive(Debug, Clone)]
pub enum RUNNING {
DISPATCHING,
ANALYZING,
OPTMIZING,
SCHEDULING,
}
impl AsRef<str> for RUNNING {
fn as_ref(&self) -> &str {
match self {
Self::DISPATCHING => "DISPATCHING",
Self::ANALYZING => "ANALYZING",
Self::OPTMIZING => "OPTMIZING",
Self::SCHEDULING => "SCHEDULING",
}
}
}
#[derive(Debug, Clone)]
pub enum DONE {
FINISHED,
FAILED,
CANCELLED,
}
impl AsRef<str> for DONE {
fn as_ref(&self) -> &str {
match self {
Self::FINISHED => "FINISHED",
Self::FAILED => "FAILED",
Self::CANCELLED => "CANCELLED",
}
}
}

View File

@@ -0,0 +1,23 @@
use std::collections::HashSet;
use std::sync::Arc;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use crate::QueryResult;
pub type FuncMetaManagerRef = Arc<dyn FunctionMetadataManager + Send + Sync>;
pub trait FunctionMetadataManager {
fn register_udf(&mut self, udf: ScalarUDF) -> QueryResult<()>;
fn register_udaf(&mut self, udaf: AggregateUDF) -> QueryResult<()>;
fn register_udwf(&mut self, udwf: WindowUDF) -> QueryResult<()>;
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>>;
fn udaf(&self, name: &str) -> QueryResult<Arc<AggregateUDF>>;
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>>;
fn udfs(&self) -> HashSet<String>;
}

View File

@@ -0,0 +1,29 @@
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::logical_expr::LogicalPlan as DFPlan;
#[derive(Clone)]
pub enum Plan {
// only support query sql
/// Query plan
Query(QueryPlan),
}
impl Plan {
pub fn schema(&self) -> SchemaRef {
match self {
Self::Query(p) => SchemaRef::from(p.df_plan.schema().as_ref().to_owned()),
}
}
}
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub df_plan: DFPlan,
pub is_tag_scan: bool,
}
impl QueryPlan {
pub fn is_explain(&self) -> bool {
matches!(self.df_plan, DFPlan::Explain(_) | DFPlan::Analyze(_))
}
}

View File

@@ -0,0 +1,19 @@
pub mod ast;
pub mod datasource;
pub mod dispatcher;
pub mod execution;
pub mod function;
pub mod logical_planner;
pub mod parser;
pub mod session;
#[derive(Clone)]
pub struct Context {
// maybe we need transfer some info?
}
#[derive(Clone)]
pub struct Query {
context: Context,
content: String,
}

View File

@@ -0,0 +1,8 @@
use std::collections::VecDeque;
use super::ast::ExtStatement;
use crate::QueryResult;
pub trait Parser {
fn parse(&self, sql: &str) -> QueryResult<VecDeque<ExtStatement>>;
}

View File

@@ -0,0 +1,20 @@
use std::sync::Arc;
use datafusion::execution::context::SessionState;
#[derive(Clone)]
pub struct SessionCtx {
desc: Arc<SessionCtxDesc>,
inner: SessionState,
}
impl SessionCtx {
pub fn inner(&self) -> &SessionState {
&self.inner
}
}
#[derive(Clone)]
pub struct SessionCtxDesc {
// maybe we need some info
}

View File

@@ -0,0 +1,43 @@
use async_trait::async_trait;
use crate::{
query::{
execution::{Output, QueryStateMachineRef},
logical_planner::Plan,
Query,
},
QueryResult,
};
pub struct QueryHandle {
query: Query,
result: Output,
}
impl QueryHandle {
pub fn new(query: Query, result: Output) -> Self {
Self { query, result }
}
pub fn query(&self) -> &Query {
&self.query
}
pub fn result(self) -> Output {
self.result
}
}
#[async_trait]
pub trait DatabaseManagerSystem {
async fn start(&self) -> QueryResult<()>;
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle>;
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef>;
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>>;
async fn execute_logical_plan(
&self,
logical_plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryHandle>;
fn metrics(&self) -> String;
}

View File

@@ -0,0 +1 @@
pub mod dbms;

12
s3select/query/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "query"
version.workspace = true
edition.workspace = true
[dependencies]
api = { path = "../api" }
async-trait.workspace = true
datafusion = { workspace = true }
derive_builder = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

View File

@@ -0,0 +1,140 @@
use std::any::Any;
use std::fmt::Display;
use std::sync::Arc;
use std::write;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableSource};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use tracing::debug;
pub const TEMP_LOCATION_TABLE_NAME: &str = "external_location_table";
pub struct TableSourceAdapter {
database_name: String,
table_name: String,
table_handle: TableHandle,
plan: LogicalPlan,
}
impl TableSourceAdapter {
pub fn try_new(
table_ref: impl Into<TableReference>,
database_name: impl Into<String>,
table_name: impl Into<String>,
table_handle: impl Into<TableHandle>,
) -> Result<Self, DataFusionError> {
let database_name = database_name.into();
let table_name: String = table_name.into();
let table_handle = table_handle.into();
let plan = match &table_handle {
// TableScan
TableHandle::External(t) => {
let table_source = provider_as_source(t.clone());
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
}
// TableScan
TableHandle::TableProvider(t) => {
let table_source = provider_as_source(t.clone());
if let Some(plan) = table_source.get_logical_plan() {
LogicalPlanBuilder::from(plan.clone()).build()?
} else {
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
}
}
};
debug!("Table source logical plan node of {}:\n{}", table_name, plan.display_indent_schema());
Ok(Self {
database_name: "default_db".to_string(),
table_name,
table_handle,
plan,
})
}
pub fn database_name(&self) -> &str {
&self.database_name
}
pub fn table_name(&self) -> &str {
&self.table_name
}
pub fn table_handle(&self) -> &TableHandle {
&self.table_handle
}
}
#[async_trait]
impl TableSource for TableSourceAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.table_handle.schema()
}
fn supports_filters_pushdown(&self, filter: &[&Expr]) -> DFResult<Vec<TableProviderFilterPushDown>> {
self.table_handle.supports_filters_pushdown(filter)
}
/// Called by [`InlineTableScan`]
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}
#[derive(Clone)]
pub enum TableHandle {
TableProvider(Arc<dyn TableProvider>),
External(Arc<ListingTable>),
}
impl TableHandle {
pub fn schema(&self) -> SchemaRef {
match self {
Self::External(t) => t.schema(),
Self::TableProvider(t) => t.schema(),
}
}
pub fn supports_filters_pushdown(&self, filter: &[&Expr]) -> DFResult<Vec<TableProviderFilterPushDown>> {
match self {
Self::External(t) => t.supports_filters_pushdown(filter),
Self::TableProvider(t) => t.supports_filters_pushdown(filter),
}
}
}
impl From<Arc<dyn TableProvider>> for TableHandle {
fn from(value: Arc<dyn TableProvider>) -> Self {
TableHandle::TableProvider(value)
}
}
impl From<Arc<ListingTable>> for TableHandle {
fn from(value: Arc<ListingTable>) -> Self {
TableHandle::External(value)
}
}
impl Display for TableHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::External(e) => write!(f, "External({:?})", e.table_paths()),
Self::TableProvider(_) => write!(f, "TableProvider"),
}
}
}

View File

@@ -0,0 +1 @@
pub mod data_source;

View File

@@ -0,0 +1,331 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use api::{
query::{
dispatcher::QueryDispatcher,
execution::{Output, QueryStateMachine},
function::FuncMetaManagerRef,
logical_planner::Plan,
parser::Parser,
Query,
},
QueryError, QueryResult,
};
use async_trait::async_trait;
use tokio::task::JoinHandle;
use crate::metadata::TableHandleProviderRef;
#[derive(Clone)]
pub struct SimpleQueryDispatcher {
// client for default tenant
default_table_provider: TableHandleProviderRef,
// memory pool
// memory_pool: MemoryPoolRef,
// query tracker
// parser
parser: Arc<dyn Parser + Send + Sync>,
// get query execution factory
query_execution_factory: QueryExecutionFactoryRef,
func_manager: FuncMetaManagerRef,
async_task_joinhandle: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
failed_task_joinhandle: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
}
#[async_trait]
impl QueryDispatcher for SimpleQueryDispatcher {
async fn start(&self) -> QueryResult<()> {
self.execute_persister_query(self.coord.node_id()).await
}
fn stop(&self) {
// TODO
}
async fn execute_query(&self, query: &Query) -> QueryResult<Output> {
let query_state_machine = { self.build_query_state_machine(query.clone()).await? };
let logical_plan = self.build_logical_plan(query_state_machine.clone()).await?;
let logical_plan = match logical_plan {
Some(plan) => plan,
None => return Ok(Output::Nil(())),
};
let result = self.execute_logical_plan(logical_plan, query_state_machine).await?;
Ok(result)
}
async fn build_logical_plan(&self, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Option<Plan>> {
let session = &query_state_machine.session;
let query = &query_state_machine.query;
let scheme_provider = self.build_scheme_provider(session).await?;
let logical_planner = DefaultLogicalPlanner::new(&scheme_provider);
let span_recorder = session.get_child_span("parse sql");
let statements = self.parser.parse(query.content())?;
// not allow multi statement
if statements.len() > 1 {
return Err(QueryError::MultiStatement {
num: statements.len(),
sql: query_state_machine.query.content().to_string(),
});
}
let stmt = match statements.front() {
Some(stmt) => stmt.clone(),
None => return Ok(None),
};
drop(span_recorder);
let logical_plan = self
.statement_to_logical_plan(stmt, &logical_planner, query_state_machine)
.await?;
Ok(Some(logical_plan))
}
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output> {
self.execute_logical_plan(logical_plan, query_state_machine).await
}
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>> {
let session = self
.session_factory
.create_session_ctx(query.context(), self.memory_pool.clone(), self.coord.clone())?;
let query_state_machine = Arc::new(QueryStateMachine::begin(query, session));
Ok(query_state_machine)
}
}
impl SimpleQueryDispatcher {
async fn statement_to_logical_plan<S: ContextProviderExtension + Send + Sync>(
&self,
stmt: ExtStatement,
logical_planner: &DefaultLogicalPlanner<'_, S>,
query_state_machine: Arc<QueryStateMachine>,
) -> QueryResult<Plan> {
// begin analyze
query_state_machine.begin_analyze();
let logical_plan = logical_planner
.create_logical_plan(stmt, &query_state_machine.session, self.coord.get_config().query.auth_enabled)
.await?;
query_state_machine.end_analyze();
Ok(logical_plan)
}
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output> {
let execution = self
.query_execution_factory
.create_query_execution(logical_plan, query_state_machine.clone())
.await?;
// TrackedQuery.drop() is called implicitly when the value goes out of scope,
self.query_tracker
.try_track_query(query_state_machine.query_id, execution)
.await?
.start()
.await
}
async fn build_scheme_provider(&self, session: &SessionCtx) -> QueryResult<MetadataProvider> {
let meta_client = self.build_current_session_meta_client(session).await?;
let current_session_table_provider = self.build_table_handle_provider(meta_client.clone())?;
let metadata_provider = MetadataProvider::new(
self.coord.clone(),
meta_client,
current_session_table_provider,
self.default_table_provider.clone(),
self.func_manager.clone(),
self.query_tracker.clone(),
session.clone(),
);
Ok(metadata_provider)
}
async fn build_current_session_meta_client(&self, session: &SessionCtx) -> QueryResult<MetaClientRef> {
let meta_client = self
.coord
.tenant_meta(session.tenant())
.await
.ok_or_else(|| MetaError::TenantNotFound {
tenant: session.tenant().to_string(),
})
.context(MetaSnafu)?;
Ok(meta_client)
}
fn build_table_handle_provider(&self, meta_client: MetaClientRef) -> QueryResult<TableHandleProviderRef> {
let current_session_table_provider: Arc<BaseTableProvider> = Arc::new(BaseTableProvider::new(
self.coord.clone(),
self.split_manager.clone(),
meta_client,
self.stream_provider_manager.clone(),
));
Ok(current_session_table_provider)
}
}
#[derive(Default, Clone)]
pub struct SimpleQueryDispatcherBuilder {
coord: Option<CoordinatorRef>,
default_table_provider: Option<TableHandleProviderRef>,
split_manager: Option<SplitManagerRef>,
session_factory: Option<Arc<SessionCtxFactory>>,
parser: Option<Arc<dyn Parser + Send + Sync>>,
query_execution_factory: Option<QueryExecutionFactoryRef>,
query_tracker: Option<Arc<QueryTracker>>,
memory_pool: Option<MemoryPoolRef>, // memory
func_manager: Option<FuncMetaManagerRef>,
stream_provider_manager: Option<StreamProviderManagerRef>,
span_ctx: Option<SpanContext>,
auth_cache: Option<Arc<AuthCache<AuthCacheKey, User>>>,
}
impl SimpleQueryDispatcherBuilder {
pub fn with_coord(mut self, coord: CoordinatorRef) -> Self {
self.coord = Some(coord);
self
}
pub fn with_default_table_provider(mut self, default_table_provider: TableHandleProviderRef) -> Self {
self.default_table_provider = Some(default_table_provider);
self
}
pub fn with_session_factory(mut self, session_factory: Arc<SessionCtxFactory>) -> Self {
self.session_factory = Some(session_factory);
self
}
pub fn with_split_manager(mut self, split_manager: SplitManagerRef) -> Self {
self.split_manager = Some(split_manager);
self
}
pub fn with_parser(mut self, parser: Arc<dyn Parser + Send + Sync>) -> Self {
self.parser = Some(parser);
self
}
pub fn with_query_execution_factory(mut self, query_execution_factory: QueryExecutionFactoryRef) -> Self {
self.query_execution_factory = Some(query_execution_factory);
self
}
pub fn with_query_tracker(mut self, query_tracker: Arc<QueryTracker>) -> Self {
self.query_tracker = Some(query_tracker);
self
}
pub fn with_memory_pool(mut self, memory_pool: MemoryPoolRef) -> Self {
self.memory_pool = Some(memory_pool);
self
}
pub fn with_func_manager(mut self, func_manager: FuncMetaManagerRef) -> Self {
self.func_manager = Some(func_manager);
self
}
pub fn with_stream_provider_manager(mut self, stream_provider_manager: StreamProviderManagerRef) -> Self {
self.stream_provider_manager = Some(stream_provider_manager);
self
}
pub fn with_span_ctx(mut self, span_ctx: Option<SpanContext>) -> Self {
self.span_ctx = span_ctx;
self
}
pub fn with_auth_cache(mut self, auth_cache: Arc<AuthCache<AuthCacheKey, User>>) -> Self {
self.auth_cache = Some(auth_cache);
self
}
pub fn build(self) -> QueryResult<Arc<SimpleQueryDispatcher>> {
let coord = self.coord.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of coord".to_string(),
})?;
let split_manager = self.split_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of split manager".to_string(),
})?;
let session_factory = self.session_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of session_factory".to_string(),
})?;
let parser = self.parser.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of parser".to_string(),
})?;
let query_execution_factory = self.query_execution_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of query_execution_factory".to_string(),
})?;
let query_tracker = self.query_tracker.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of query_tracker".to_string(),
})?;
let func_manager = self.func_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of func_manager".to_string(),
})?;
let stream_provider_manager = self.stream_provider_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of stream_provider_manager".to_string(),
})?;
let memory_pool = self.memory_pool.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of memory pool".to_string(),
})?;
let default_table_provider = self.default_table_provider.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of default_table_provider".to_string(),
})?;
let span_ctx = self.span_ctx;
let auth_cache = self.auth_cache.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of auth_cache".to_string(),
})?;
let dispatcher = Arc::new(SimpleQueryDispatcher {
coord,
default_table_provider,
split_manager,
session_factory,
memory_pool,
parser,
query_execution_factory,
query_tracker,
func_manager,
stream_provider_manager,
span_ctx,
async_task_joinhandle: Arc::new(Mutex::new(HashMap::new())),
failed_task_joinhandle: Arc::new(Mutex::new(HashMap::new())),
auth_cache,
});
let meta_task_receiver = dispatcher
.coord
.meta_manager()
.take_resourceinfo_rx()
.expect("meta resource channel only has one consumer");
tokio::spawn(SimpleQueryDispatcher::recv_meta_modify(dispatcher.clone(), meta_task_receiver));
Ok(dispatcher)
}
}

View File

@@ -0,0 +1 @@
pub mod manager;

View File

@@ -0,0 +1,109 @@
use std::sync::Arc;
use api::query::execution::QueryExecutionFactory;
pub type QueryExecutionFactoryRef = Arc<dyn QueryExecutionFactory + Send + Sync>;
pub struct SqlQueryExecutionFactory {
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
query_tracker: Arc<QueryTracker>,
trigger_executor_factory: TriggerExecutorFactoryRef,
runtime: Arc<DedicatedExecutor>,
stream_checker_manager: StreamCheckerManagerRef,
}
impl SqlQueryExecutionFactory {
#[inline(always)]
pub fn new(
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
query_tracker: Arc<QueryTracker>,
stream_checker_manager: StreamCheckerManagerRef,
config: Arc<QueryOptions>,
) -> Self {
// Only do periodic scheduling, no need for many threads
let trigger_executor_runtime = DedicatedExecutor::new("stream-trigger", config.stream_trigger_cpu);
let trigger_executor_factory = Arc::new(TriggerExecutorFactory::new(Arc::new(trigger_executor_runtime)));
// perform stream-related preparations, not actual operator execution
let runtime = Arc::new(DedicatedExecutor::new("stream-executor", config.stream_executor_cpu));
Self {
optimizer,
scheduler,
query_tracker,
trigger_executor_factory,
runtime,
stream_checker_manager,
}
}
}
#[async_trait]
impl QueryExecutionFactory for SqlQueryExecutionFactory {
async fn create_query_execution(
&self,
plan: Plan,
state_machine: QueryStateMachineRef,
) -> Result<QueryExecutionRef, QueryError> {
match plan {
Plan::Query(query_plan) => {
// 获取执行计划中所有涉及到的stream source
let stream_providers = extract_stream_providers(&query_plan);
// (含有流表, explain, dml)
match (!stream_providers.is_empty(), query_plan.is_explain(), is_dml(&query_plan)) {
(false, _, _) | (true, true, _) => Ok(Arc::new(SqlQueryExecution::new(
state_machine,
query_plan,
self.optimizer.clone(),
self.scheduler.clone(),
))),
(true, false, true) => {
// 流操作
// stream source + dml + !explain
let options = state_machine.session.inner().config().into();
let exec = MicroBatchStreamExecutionBuilder::new(MicroBatchStreamExecutionDesc {
plan: Arc::new(query_plan),
options,
})
.with_stream_providers(stream_providers)
.build(
state_machine,
self.scheduler.clone(),
self.trigger_executor_factory.clone(),
self.runtime.clone(),
)
.await?;
Ok(Arc::new(exec))
}
(true, false, false) => {
// stream source + !dml + !explain
Err(QueryError::NotImplemented {
err: "Stream table can only be used as source table in insert select statements.".to_string(),
})
}
}
}
Plan::DDL(ddl_plan) => Ok(Arc::new(DDLExecution::new(state_machine, self.stream_checker_manager.clone(), ddl_plan))),
Plan::DML(dml_plan) => Ok(Arc::new(DMLExecution::new(state_machine, dml_plan))),
Plan::SYSTEM(sys_plan) => Ok(Arc::new(SystemExecution::new(state_machine, sys_plan, self.query_tracker.clone()))),
}
}
}
fn is_dml(query_plan: &QueryPlan) -> bool {
match &query_plan.df_plan {
LogicalPlan::Dml(_) => true,
LogicalPlan::Extension(Extension { node }) => downcast_plan_node::<TableWriterMergePlanNode>(node.as_ref()).is_some(),
_ => false,
}
}
impl Drop for SqlQueryExecutionFactory {
fn drop(&mut self) {
self.runtime.shutdown();
}
}

View File

@@ -0,0 +1 @@
pub mod factory;

View File

@@ -0,0 +1,68 @@
use std::sync::Arc;
use api::{
query::{dispatcher::QueryDispatcher, execution::QueryStateMachineRef, logical_planner::Plan, Query},
server::dbms::{DatabaseManagerSystem, QueryHandle},
QueryResult,
};
use async_trait::async_trait;
use derive_builder::Builder;
#[derive(Builder)]
pub struct RustFSms<D: QueryDispatcher> {
// query dispatcher & query execution
query_dispatcher: Arc<D>,
}
#[async_trait]
impl<D> DatabaseManagerSystem for RustFSms<D>
where
D: QueryDispatcher,
{
async fn start(&self) -> QueryResult<()> {
self.query_dispatcher.start().await
}
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle> {
let result = self.query_dispatcher.execute_query(query).await?;
Ok(QueryHandle::new(query.clone(), result))
}
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef> {
let query_state_machine = self.query_dispatcher.build_query_state_machine(query).await?;
Ok(query_state_machine)
}
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>> {
let logical_plan = self.query_dispatcher.build_logical_plan(query_state_machine).await?;
Ok(logical_plan)
}
async fn execute_logical_plan(
&self,
logical_plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryHandle> {
let query = query_state_machine.query.clone();
let result = self
.query_dispatcher
.execute_logical_plan(logical_plan, query_state_machine)
.await?;
Ok(QueryHandle::new(query.clone(), result))
}
fn metrics(&self) -> String {
let infos = self.query_dispatcher.running_query_infos();
let status = self.query_dispatcher.running_query_status();
format!(
"infos: {}\nstatus: {}\n",
infos.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>().join(","),
status.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>().join(",")
)
}
}

View File

@@ -0,0 +1,6 @@
pub mod data_source;
pub mod dispatcher;
pub mod execution;
pub mod instance;
pub mod metadata;
pub mod sql;

View File

@@ -0,0 +1,63 @@
use std::sync::Arc;
use datafusion::common::Result as DFResult;
use datafusion::config::{CsvOptions, JsonOptions};
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::error::DataFusionError;
use datafusion::execution::SessionState;
use crate::data_source::data_source::TableHandle;
use super::TableHandleProvider;
pub enum FileType {
Csv,
Parquet,
Json,
Unknown,
}
#[derive(Default)]
pub struct BaseTableProvider {
file_type: FileType,
}
impl BaseTableProvider {
pub fn new(file_type: FileType) -> Self {
Self { file_type }
}
}
impl TableHandleProvider for BaseTableProvider {
async fn build_table_handle(&self, session_state: &SessionState, table_name: &str) -> DFResult<TableHandle> {
let table_path = ListingTableUrl::parse(table_name)?;
let listing_options = match self.file_type {
FileType::Csv => {
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false));
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
}
FileType::Parquet => {
let file_format = ParquetFormat::new();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
}
FileType::Json => {
let file_format = JsonFormat::default();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".json")
}
FileType::Unknown => {
return Err(DataFusionError::NotImplemented("not support this file type".to_string()));
}
};
let resolve_schema = listing_options.infer_schema(session_state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolve_schema);
let provider = Arc::new(ListingTable::try_new(config)?);
Ok(TableHandle::External(provider))
}
}

View File

@@ -0,0 +1,149 @@
use std::sync::Arc;
use api::query::{function::FuncMetaManagerRef, session::SessionCtx};
use api::ResolvedTable;
use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result as DFResult;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionState;
use datafusion::logical_expr::var_provider::is_system_variables;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion::variable::VarType;
use datafusion::{
config::ConfigOptions,
sql::{planner::ContextProvider, TableReference},
};
use crate::data_source::data_source::{TableHandle, TableSourceAdapter};
pub mod base_table;
#[async_trait]
pub trait ContextProviderExtension: ContextProvider {
fn get_table_source(&self, name: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>>;
}
pub type TableHandleProviderRef = Arc<dyn TableHandleProvider + Send + Sync>;
pub trait TableHandleProvider {
fn build_table_handle(&self, session_state: &SessionState, table_name: &str) -> DFResult<TableHandle>;
}
pub struct MetadataProvider {
session: SessionCtx,
config_options: ConfigOptions,
func_manager: FuncMetaManagerRef,
current_session_table_provider: TableHandleProviderRef,
}
impl MetadataProvider {
#[allow(clippy::too_many_arguments)]
pub fn new(
current_session_table_provider: TableHandleProviderRef,
func_manager: FuncMetaManagerRef,
session: SessionCtx,
) -> Self {
Self {
current_session_table_provider,
config_options: session.inner().config_options().clone(),
session,
func_manager,
}
}
fn build_table_handle(&self, name: &ResolvedTable) -> datafusion::common::Result<TableHandle> {
let table_name = name.table();
self.current_session_table_provider.build_table_handle(table_name)
}
}
#[async_trait::async_trait]
impl ContextProviderExtension for MetadataProvider {
fn get_table_source(&self, table_ref: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>> {
let name = table_ref
.clone()
.resolve_object(self.session.tenant(), self.session.default_database())?;
let table_name = name.table();
let database_name = name.database();
let tenant_name = name.tenant();
// Cannot query across tenants
if self.session.tenant() != tenant_name {
return Err(DataFusionError::Plan(format!(
"Tenant conflict, the current connection's tenant is {}",
self.session.tenant()
)));
}
// save access table
self.access_databases.write().push_table(database_name, table_name);
let table_handle = self.build_table_handle(&name)?;
Ok(Arc::new(TableSourceAdapter::try_new(
table_ref.to_owned_reference(),
database_name,
table_name,
table_handle,
)?))
}
}
impl ContextProvider for MetadataProvider {
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.func_manager
.udf(name)
.ok()
.or(self.session.inner().scalar_functions().get(name).cloned())
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.func_manager.udaf(name).ok()
}
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
if variable_names.is_empty() {
return None;
}
let var_type = if is_system_variables(variable_names) {
VarType::System
} else {
VarType::UserDefined
};
self.session
.inner()
.execution_props()
.get_var_provider(var_type)
.and_then(|p| p.get_type(variable_names))
}
fn options(&self) -> &ConfigOptions {
// TODO refactor
&self.config_options
}
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
self.func_manager.udwf(name).ok()
}
fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
Ok(self.get_table_source(name)?)
}
fn udf_names(&self) -> Vec<String> {
todo!()
}
fn udaf_names(&self) -> Vec<String> {
todo!()
}
fn udwf_names(&self) -> Vec<String> {
todo!()
}
}

View File

@@ -0,0 +1 @@
pub mod planner;

View File

@@ -0,0 +1,3 @@
use crate::sql::planner::SqlPlanner;
pub type DefaultLogicalPlanner<'a, S> = SqlPlanner<'a, S>;

View File

@@ -0,0 +1,3 @@
pub mod logical;
pub mod physical;
pub mod planner;

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,8 @@
use datafusion::sql::planner::SqlToRel;
use crate::metadata::ContextProviderExtension;
pub struct SqlPlanner<'a, S: ContextProviderExtension> {
schema_provider: &'a S,
df_planner: SqlToRel<'a, S>,
}