mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
1383
Cargo.lock
generated
1383
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@@ -11,6 +11,8 @@ members = [
|
||||
"iam", # Identity and Access Management
|
||||
"crypto", # Cryptography and security features
|
||||
"cli/rustfs-gui", # Graphical user interface client
|
||||
"s3select/api",
|
||||
"s3select/query",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
@@ -29,12 +31,15 @@ all = "warn"
|
||||
|
||||
[workspace.dependencies]
|
||||
madmin = { path = "./madmin" }
|
||||
async-recursion = "1.0.5"
|
||||
async-trait = "0.1.86"
|
||||
backon = "1.3.0"
|
||||
bytes = "1.9.0"
|
||||
bytesize = "1.3.0"
|
||||
chrono = { version = "0.4.40", features = ["serde"] }
|
||||
chrono = { version = "0.4.39", features = ["serde"] }
|
||||
clap = { version = "4.5.31", features = ["derive", "env"] }
|
||||
datafusion = "46.0.0"
|
||||
derive_builder = "0.20.2"
|
||||
dioxus = { version = "0.6.3", features = ["router"] }
|
||||
dirs = "6.0.0"
|
||||
ecstore = { path = "./ecstore" }
|
||||
@@ -94,6 +99,7 @@ tonic = { version = "0.12.3", features = ["gzip"] }
|
||||
tonic-build = "0.12.3"
|
||||
tonic-reflection = "0.12"
|
||||
tokio-stream = "0.1.17"
|
||||
tokio-util = { version = "0.7.13", features = ["io", "compat"] }
|
||||
tower = { version = "0.5.2", features = ["timeout"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-error = "0.2.1"
|
||||
@@ -112,7 +118,7 @@ md-5 = "0.10.6"
|
||||
workers = { path = "./common/workers" }
|
||||
test-case = "3.3.1"
|
||||
zip = "2.2.3"
|
||||
|
||||
snafu = "0.8.5"
|
||||
|
||||
|
||||
[profile.wasm-dev]
|
||||
|
||||
@@ -151,7 +151,7 @@ fn read_drive_stats(stats_file: &str) -> Result<IOStats> {
|
||||
fn read_stat(file_name: &str) -> Result<Vec<u64>> {
|
||||
// 打开文件
|
||||
let path = Path::new(file_name);
|
||||
let file = File::open(&path)?;
|
||||
let file = File::open(path)?;
|
||||
|
||||
// 创建一个 BufReader
|
||||
let reader = io::BufReader::new(file);
|
||||
@@ -161,7 +161,8 @@ fn read_stat(file_name: &str) -> Result<Vec<u64>> {
|
||||
if let Some(line) = reader.lines().next() {
|
||||
let line = line?;
|
||||
// 分割行并解析为 u64
|
||||
for token in line.trim().split_whitespace() {
|
||||
// https://rust-lang.github.io/rust-clippy/master/index.html#trim_split_whitespace
|
||||
for token in line.split_whitespace() {
|
||||
let ui64: u64 = token.parse()?;
|
||||
stats.push(ui64);
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ log.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
clap.workspace = true
|
||||
csv = "1.3.1"
|
||||
datafusion = { workspace = true }
|
||||
common.workspace = true
|
||||
ecstore.workspace = true
|
||||
policy.workspace =true
|
||||
@@ -45,7 +47,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
|
||||
tokio-util = { version = "0.7.13", features = ["io", "compat"] }
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"macros",
|
||||
@@ -69,6 +71,8 @@ const-str = { version = "0.6.1", features = ["std", "proc"] }
|
||||
atoi = "2.0.0"
|
||||
serde_urlencoded = "0.7.1"
|
||||
crypto = { path = "../crypto" }
|
||||
query = { path = "../s3select/query" }
|
||||
api = { path = "../s3select/api" }
|
||||
iam = { path = "../iam" }
|
||||
jsonwebtoken = "9.3.0"
|
||||
tower-http = { version = "0.6.2", features = ["cors"] }
|
||||
|
||||
@@ -4,8 +4,14 @@ use super::options::extract_metadata;
|
||||
use super::options::put_opts;
|
||||
use crate::auth::get_condition_values;
|
||||
use crate::storage::access::ReqInfo;
|
||||
use api::query::Context;
|
||||
use api::query::Query;
|
||||
use api::server::dbms::DatabaseManagerSystem;
|
||||
use bytes::Bytes;
|
||||
use common::error::Result;
|
||||
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
|
||||
use datafusion::arrow::json::writer::JsonArray;
|
||||
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
|
||||
use ecstore::bucket::error::BucketMetadataError;
|
||||
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
|
||||
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
|
||||
@@ -47,6 +53,7 @@ use policy::policy::action::S3Action;
|
||||
use policy::policy::BucketPolicy;
|
||||
use policy::policy::BucketPolicyArgs;
|
||||
use policy::policy::Validator;
|
||||
use query::instance::make_rustfsms;
|
||||
use s3s::dto::*;
|
||||
use s3s::s3_error;
|
||||
use s3s::S3Error;
|
||||
@@ -56,6 +63,8 @@ use s3s::S3;
|
||||
use s3s::{S3Request, S3Response};
|
||||
use std::fmt::Debug;
|
||||
use std::str::FromStr;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::debug;
|
||||
@@ -1859,6 +1868,69 @@ impl S3 for FS {
|
||||
}
|
||||
Ok(S3Response::new(PutObjectAclOutput::default()))
|
||||
}
|
||||
|
||||
async fn select_object_content(
|
||||
&self,
|
||||
req: S3Request<SelectObjectContentInput>,
|
||||
) -> S3Result<S3Response<SelectObjectContentOutput>> {
|
||||
info!("handle select_object_content");
|
||||
|
||||
let input = req.input;
|
||||
info!("{:?}", input);
|
||||
|
||||
let db = make_rustfsms(input.clone(), false).await.map_err(|e| {
|
||||
error!("make db failed, {}", e.to_string());
|
||||
s3_error!(InternalError)
|
||||
})?;
|
||||
let query = Query::new(Context { input: input.clone() }, input.request.expression);
|
||||
let result = db.execute(&query).await.map_err(|_| s3_error!(InternalError))?;
|
||||
|
||||
let results = result.result().chunk_result().await.unwrap().to_vec();
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
if input.request.output_serialization.csv.is_some() {
|
||||
let mut csv_writer = CsvWriterBuilder::new().with_header(false).build(&mut buffer);
|
||||
for batch in results {
|
||||
csv_writer
|
||||
.write(&batch)
|
||||
.map_err(|e| s3_error!(InternalError, "cann't encode output to csv. e: {}", e.to_string()))?;
|
||||
}
|
||||
} else if input.request.output_serialization.json.is_some() {
|
||||
let mut json_writer = JsonWriterBuilder::new()
|
||||
.with_explicit_nulls(true)
|
||||
.build::<_, JsonArray>(&mut buffer);
|
||||
for batch in results {
|
||||
json_writer
|
||||
.write(&batch)
|
||||
.map_err(|e| s3_error!(InternalError, "cann't encode output to json. e: {}", e.to_string()))?;
|
||||
}
|
||||
json_writer
|
||||
.finish()
|
||||
.map_err(|e| s3_error!(InternalError, "writer output into json error, e: {}", e.to_string()))?;
|
||||
} else {
|
||||
return Err(s3_error!(InvalidArgument, "unknow output format"));
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel::<S3Result<SelectObjectContentEvent>>(2);
|
||||
let stream = ReceiverStream::new(rx);
|
||||
tokio::spawn(async move {
|
||||
let _ = tx
|
||||
.send(Ok(SelectObjectContentEvent::Cont(ContinuationEvent::default())))
|
||||
.await;
|
||||
let _ = tx
|
||||
.send(Ok(SelectObjectContentEvent::Records(RecordsEvent {
|
||||
payload: Some(Bytes::from(buffer)),
|
||||
})))
|
||||
.await;
|
||||
let _ = tx.send(Ok(SelectObjectContentEvent::End(EndEvent::default()))).await;
|
||||
|
||||
drop(tx);
|
||||
});
|
||||
|
||||
Ok(S3Response::new(SelectObjectContentOutput {
|
||||
payload: Some(SelectObjectContentEventStream::new(stream)),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
||||
22
s3select/api/Cargo.toml
Normal file
22
s3select/api/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "api"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
datafusion = { workspace = true }
|
||||
ecstore.workspace = true
|
||||
futures = { workspace = true }
|
||||
futures-core = "0.3.31"
|
||||
http.workspace = true
|
||||
object_store = "0.11.2"
|
||||
s3s.workspace = true
|
||||
snafu = { workspace = true, features = ["backtrace"] }
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
transform-stream.workspace = true
|
||||
url.workspace = true
|
||||
77
s3select/api/src/lib.rs
Normal file
77
s3select/api/src/lib.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use datafusion::{common::DataFusionError, sql::sqlparser::parser::ParserError};
|
||||
use snafu::{Backtrace, Location, Snafu};
|
||||
|
||||
pub mod object_store;
|
||||
pub mod query;
|
||||
pub mod server;
|
||||
|
||||
pub type QueryResult<T> = Result<T, QueryError>;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum QueryError {
|
||||
Datafusion {
|
||||
source: DataFusionError,
|
||||
location: Location,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("This feature is not implemented: {}", err))]
|
||||
NotImplemented { err: String },
|
||||
|
||||
#[snafu(display("Multi-statement not allow, found num:{}, sql:{}", num, sql))]
|
||||
MultiStatement { num: usize, sql: String },
|
||||
|
||||
#[snafu(display("Failed to build QueryDispatcher. err: {}", err))]
|
||||
BuildQueryDispatcher { err: String },
|
||||
|
||||
#[snafu(display("The query has been canceled"))]
|
||||
Cancel,
|
||||
|
||||
#[snafu(display("{}", source))]
|
||||
Parser { source: ParserError },
|
||||
|
||||
#[snafu(display("Udf not exists, name:{}.", name))]
|
||||
FunctionNotExists { name: String },
|
||||
|
||||
#[snafu(display("Udf already exists, name:{}.", name))]
|
||||
FunctionExists { name: String },
|
||||
|
||||
#[snafu(display("Store Error, e:{}.", e))]
|
||||
StoreError { e: String },
|
||||
}
|
||||
|
||||
impl From<DataFusionError> for QueryError {
|
||||
fn from(value: DataFusionError) -> Self {
|
||||
match value {
|
||||
DataFusionError::External(e) if e.downcast_ref::<QueryError>().is_some() => *e.downcast::<QueryError>().unwrap(),
|
||||
|
||||
v => Self::Datafusion {
|
||||
source: v,
|
||||
location: Default::default(),
|
||||
backtrace: Backtrace::capture(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ResolvedTable {
|
||||
// path
|
||||
table: String,
|
||||
}
|
||||
|
||||
impl ResolvedTable {
|
||||
pub fn table(&self) -> &str {
|
||||
&self.table
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ResolvedTable {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let Self { table } = self;
|
||||
write!(f, "{table}")
|
||||
}
|
||||
}
|
||||
177
s3select/api/src/object_store.rs
Normal file
177
s3select/api/src/object_store.rs
Normal file
@@ -0,0 +1,177 @@
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use chrono::Utc;
|
||||
use ecstore::io::READ_BUFFER_SIZE;
|
||||
use ecstore::new_object_layer_fn;
|
||||
use ecstore::store::ECStore;
|
||||
use ecstore::store_api::ObjectIO;
|
||||
use ecstore::store_api::ObjectOptions;
|
||||
use ecstore::StorageAPI;
|
||||
use futures::pin_mut;
|
||||
use futures::{Stream, StreamExt};
|
||||
use futures_core::stream::BoxStream;
|
||||
use http::HeaderMap;
|
||||
use object_store::path::Path;
|
||||
use object_store::Attributes;
|
||||
use object_store::GetOptions;
|
||||
use object_store::GetResult;
|
||||
use object_store::ListResult;
|
||||
use object_store::MultipartUpload;
|
||||
use object_store::ObjectMeta;
|
||||
use object_store::ObjectStore;
|
||||
use object_store::PutMultipartOpts;
|
||||
use object_store::PutOptions;
|
||||
use object_store::PutPayload;
|
||||
use object_store::PutResult;
|
||||
use object_store::{Error as o_Error, Result};
|
||||
use s3s::dto::SelectObjectContentInput;
|
||||
use s3s::s3_error;
|
||||
use s3s::S3Result;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::info;
|
||||
use transform_stream::AsyncTryStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EcObjectStore {
|
||||
input: SelectObjectContentInput,
|
||||
|
||||
store: Arc<ECStore>,
|
||||
}
|
||||
|
||||
impl EcObjectStore {
|
||||
pub fn new(input: SelectObjectContentInput) -> S3Result<Self> {
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(s3_error!(InternalError, "ec store not inited"));
|
||||
};
|
||||
|
||||
Ok(Self { input, store })
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EcObjectStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str("EcObjectStore")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStore for EcObjectStore {
|
||||
async fn put_opts(&self, _location: &Path, _payload: PutPayload, _opts: PutOptions) -> Result<PutResult> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOpts) -> Result<Box<dyn MultipartUpload>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get_opts(&self, location: &Path, _options: GetOptions) -> Result<GetResult> {
|
||||
info!("{:?}", location);
|
||||
let opts = ObjectOptions::default();
|
||||
let h = HeaderMap::new();
|
||||
let reader = self
|
||||
.store
|
||||
.get_object_reader(&self.input.bucket, &self.input.key, None, h, &opts)
|
||||
.await
|
||||
.map_err(|_| o_Error::NotFound {
|
||||
path: format!("{}/{}", self.input.bucket, self.input.key),
|
||||
source: "can not get object info".into(),
|
||||
})?;
|
||||
|
||||
// let stream = stream::unfold(reader.stream, |mut blob| async move {
|
||||
// match blob.next().await {
|
||||
// Some(Ok(chunk)) => {
|
||||
// let bytes = chunk;
|
||||
// Some((Ok(bytes), blob))
|
||||
// }
|
||||
// _ => None,
|
||||
// }
|
||||
// })
|
||||
// .boxed();
|
||||
let meta = ObjectMeta {
|
||||
location: location.clone(),
|
||||
last_modified: Utc::now(),
|
||||
size: reader.object_info.size,
|
||||
e_tag: reader.object_info.etag,
|
||||
version: None,
|
||||
};
|
||||
let attributes = Attributes::default();
|
||||
|
||||
Ok(GetResult {
|
||||
payload: object_store::GetResultPayload::Stream(
|
||||
bytes_stream(ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE), reader.object_info.size).boxed(),
|
||||
),
|
||||
meta,
|
||||
range: 0..reader.object_info.size,
|
||||
attributes,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
|
||||
info!("{:?}", location);
|
||||
let opts = ObjectOptions::default();
|
||||
let info = self
|
||||
.store
|
||||
.get_object_info(&self.input.bucket, &self.input.key, &opts)
|
||||
.await
|
||||
.map_err(|_| o_Error::NotFound {
|
||||
path: format!("{}/{}", self.input.bucket, self.input.key),
|
||||
source: "can not get object info".into(),
|
||||
})?;
|
||||
|
||||
Ok(ObjectMeta {
|
||||
location: location.clone(),
|
||||
last_modified: Utc::now(),
|
||||
size: info.size,
|
||||
e_tag: info.etag,
|
||||
version: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete(&self, _location: &Path) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, _from: &Path, _too: &Path) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes_stream<S>(stream: S, content_length: usize) -> impl Stream<Item = Result<Bytes>> + Send + 'static
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
|
||||
{
|
||||
AsyncTryStream::<Bytes, o_Error, _>::new(|mut y| async move {
|
||||
pin_mut!(stream);
|
||||
let mut remaining: usize = content_length;
|
||||
while let Some(result) = stream.next().await {
|
||||
let mut bytes = result.map_err(|e| o_Error::Generic {
|
||||
store: "",
|
||||
source: Box::new(e),
|
||||
})?;
|
||||
if bytes.len() > remaining {
|
||||
bytes.truncate(remaining);
|
||||
}
|
||||
remaining -= bytes.len();
|
||||
y.yield_ok(bytes).await;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
12
s3select/api/src/query/analyzer.rs
Normal file
12
s3select/api/src/query/analyzer.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::logical_expr::LogicalPlan;
|
||||
|
||||
use super::session::SessionCtx;
|
||||
use crate::QueryResult;
|
||||
|
||||
pub type AnalyzerRef = Arc<dyn Analyzer + Send + Sync>;
|
||||
|
||||
pub trait Analyzer {
|
||||
fn analyze(&self, plan: &LogicalPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
|
||||
}
|
||||
8
s3select/api/src/query/ast.rs
Normal file
8
s3select/api/src/query/ast.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use datafusion::sql::sqlparser::ast::Statement;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ExtStatement {
|
||||
/// ANSI SQL AST node
|
||||
SqlStatement(Box<Statement>),
|
||||
// we can expand command
|
||||
}
|
||||
1
s3select/api/src/query/datasource/mod.rs
Normal file
1
s3select/api/src/query/datasource/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
32
s3select/api/src/query/dispatcher.rs
Normal file
32
s3select/api/src/query/dispatcher.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::QueryResult;
|
||||
|
||||
use super::{
|
||||
execution::{Output, QueryStateMachine},
|
||||
logical_planner::Plan,
|
||||
Query,
|
||||
};
|
||||
|
||||
#[async_trait]
|
||||
pub trait QueryDispatcher: Send + Sync {
|
||||
// fn create_query_id(&self) -> QueryId;
|
||||
|
||||
// fn query_info(&self, id: &QueryId);
|
||||
|
||||
async fn execute_query(&self, query: &Query) -> QueryResult<Output>;
|
||||
|
||||
async fn build_logical_plan(&self, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Option<Plan>>;
|
||||
|
||||
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output>;
|
||||
|
||||
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>>;
|
||||
|
||||
// fn running_query_infos(&self) -> Vec<QueryInfo>;
|
||||
|
||||
// fn running_query_status(&self) -> Vec<QueryStatus>;
|
||||
|
||||
// fn cancel_query(&self, id: &QueryId);
|
||||
}
|
||||
241
s3select/api/src/query/execution.rs
Normal file
241
s3select/api/src/query/execution.rs
Normal file
@@ -0,0 +1,241 @@
|
||||
use std::fmt::Display;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicPtr, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use datafusion::arrow::datatypes::{Schema, SchemaRef};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
|
||||
use crate::{QueryError, QueryResult};
|
||||
|
||||
use super::logical_planner::Plan;
|
||||
use super::session::SessionCtx;
|
||||
use super::Query;
|
||||
|
||||
pub type QueryExecutionRef = Arc<dyn QueryExecution>;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum QueryType {
|
||||
Batch,
|
||||
Stream,
|
||||
}
|
||||
|
||||
impl Display for QueryType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Batch => write!(f, "batch"),
|
||||
Self::Stream => write!(f, "stream"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait QueryExecution: Send + Sync {
|
||||
fn query_type(&self) -> QueryType {
|
||||
QueryType::Batch
|
||||
}
|
||||
// 开始
|
||||
async fn start(&self) -> QueryResult<Output>;
|
||||
// 停止
|
||||
fn cancel(&self) -> QueryResult<()>;
|
||||
}
|
||||
|
||||
pub enum Output {
|
||||
StreamData(SendableRecordBatchStream),
|
||||
Nil(()),
|
||||
}
|
||||
|
||||
impl Output {
|
||||
pub fn schema(&self) -> SchemaRef {
|
||||
match self {
|
||||
Self::StreamData(stream) => stream.schema(),
|
||||
Self::Nil(_) => Arc::new(Schema::empty()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn chunk_result(self) -> QueryResult<Vec<RecordBatch>> {
|
||||
match self {
|
||||
Self::Nil(_) => Ok(vec![]),
|
||||
Self::StreamData(stream) => {
|
||||
let schema = stream.schema();
|
||||
let mut res: Vec<RecordBatch> = stream.try_collect::<Vec<RecordBatch>>().await?;
|
||||
if res.is_empty() {
|
||||
res.push(RecordBatch::new_empty(schema));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn num_rows(self) -> usize {
|
||||
match self.chunk_result().await {
|
||||
Ok(rb) => rb.iter().map(|e| e.num_rows()).sum(),
|
||||
Err(_) => 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of records affected by the query operation
|
||||
///
|
||||
/// If it is a select statement, returns the number of rows in the result set
|
||||
///
|
||||
/// -1 means unknown
|
||||
///
|
||||
/// panic! when StreamData's number of records greater than i64::Max
|
||||
pub async fn affected_rows(self) -> i64 {
|
||||
self.num_rows().await as i64
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Output {
|
||||
type Item = std::result::Result<RecordBatch, QueryError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
match this {
|
||||
Output::StreamData(stream) => stream.poll_next_unpin(cx).map_err(|e| e.into()),
|
||||
Output::Nil(_) => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait QueryExecutionFactory {
|
||||
async fn create_query_execution(
|
||||
&self,
|
||||
plan: Plan,
|
||||
query_state_machine: QueryStateMachineRef,
|
||||
) -> QueryResult<QueryExecutionRef>;
|
||||
}
|
||||
|
||||
pub type QueryStateMachineRef = Arc<QueryStateMachine>;
|
||||
|
||||
pub struct QueryStateMachine {
|
||||
pub session: SessionCtx,
|
||||
pub query: Query,
|
||||
|
||||
state: AtomicPtr<QueryState>,
|
||||
start: Instant,
|
||||
}
|
||||
|
||||
impl QueryStateMachine {
|
||||
pub fn begin(query: Query, session: SessionCtx) -> Self {
|
||||
Self {
|
||||
session,
|
||||
query,
|
||||
state: AtomicPtr::new(Box::into_raw(Box::new(QueryState::ACCEPTING))),
|
||||
start: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn begin_analyze(&self) {
|
||||
// TODO record time
|
||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::ANALYZING)));
|
||||
}
|
||||
|
||||
pub fn end_analyze(&self) {
|
||||
// TODO record time
|
||||
}
|
||||
|
||||
pub fn begin_optimize(&self) {
|
||||
// TODO record time
|
||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTMIZING)));
|
||||
}
|
||||
|
||||
pub fn end_optimize(&self) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
pub fn begin_schedule(&self) {
|
||||
// TODO
|
||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::SCHEDULING)));
|
||||
}
|
||||
|
||||
pub fn end_schedule(&self) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
pub fn finish(&self) {
|
||||
// TODO
|
||||
self.translate_to(Box::new(QueryState::DONE(DONE::FINISHED)));
|
||||
}
|
||||
|
||||
pub fn cancel(&self) {
|
||||
// TODO
|
||||
self.translate_to(Box::new(QueryState::DONE(DONE::CANCELLED)));
|
||||
}
|
||||
|
||||
pub fn fail(&self) {
|
||||
// TODO
|
||||
self.translate_to(Box::new(QueryState::DONE(DONE::FAILED)));
|
||||
}
|
||||
|
||||
pub fn state(&self) -> &QueryState {
|
||||
unsafe { &*self.state.load(Ordering::Relaxed) }
|
||||
}
|
||||
|
||||
pub fn duration(&self) -> Duration {
|
||||
self.start.elapsed()
|
||||
}
|
||||
|
||||
fn translate_to(&self, state: Box<QueryState>) {
|
||||
self.state.store(Box::into_raw(state), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum QueryState {
|
||||
ACCEPTING,
|
||||
RUNNING(RUNNING),
|
||||
DONE(DONE),
|
||||
}
|
||||
|
||||
impl AsRef<str> for QueryState {
|
||||
fn as_ref(&self) -> &str {
|
||||
match self {
|
||||
QueryState::ACCEPTING => "ACCEPTING",
|
||||
QueryState::RUNNING(e) => e.as_ref(),
|
||||
QueryState::DONE(e) => e.as_ref(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RUNNING {
|
||||
DISPATCHING,
|
||||
ANALYZING,
|
||||
OPTMIZING,
|
||||
SCHEDULING,
|
||||
}
|
||||
|
||||
impl AsRef<str> for RUNNING {
|
||||
fn as_ref(&self) -> &str {
|
||||
match self {
|
||||
Self::DISPATCHING => "DISPATCHING",
|
||||
Self::ANALYZING => "ANALYZING",
|
||||
Self::OPTMIZING => "OPTMIZING",
|
||||
Self::SCHEDULING => "SCHEDULING",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DONE {
|
||||
FINISHED,
|
||||
FAILED,
|
||||
CANCELLED,
|
||||
}
|
||||
|
||||
impl AsRef<str> for DONE {
|
||||
fn as_ref(&self) -> &str {
|
||||
match self {
|
||||
Self::FINISHED => "FINISHED",
|
||||
Self::FAILED => "FAILED",
|
||||
Self::CANCELLED => "CANCELLED",
|
||||
}
|
||||
}
|
||||
}
|
||||
23
s3select/api/src/query/function.rs
Normal file
23
s3select/api/src/query/function.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
|
||||
|
||||
use crate::QueryResult;
|
||||
|
||||
pub type FuncMetaManagerRef = Arc<dyn FunctionMetadataManager + Send + Sync>;
|
||||
pub trait FunctionMetadataManager {
|
||||
fn register_udf(&mut self, udf: ScalarUDF) -> QueryResult<()>;
|
||||
|
||||
fn register_udaf(&mut self, udaf: AggregateUDF) -> QueryResult<()>;
|
||||
|
||||
fn register_udwf(&mut self, udwf: WindowUDF) -> QueryResult<()>;
|
||||
|
||||
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>>;
|
||||
|
||||
fn udaf(&self, name: &str) -> QueryResult<Arc<AggregateUDF>>;
|
||||
|
||||
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>>;
|
||||
|
||||
fn udfs(&self) -> HashSet<String>;
|
||||
}
|
||||
40
s3select/api/src/query/logical_planner.rs
Normal file
40
s3select/api/src/query/logical_planner.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use async_trait::async_trait;
|
||||
use datafusion::arrow::datatypes::SchemaRef;
|
||||
use datafusion::logical_expr::LogicalPlan as DFPlan;
|
||||
|
||||
use crate::QueryResult;
|
||||
|
||||
use super::ast::ExtStatement;
|
||||
use super::session::SessionCtx;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum Plan {
|
||||
// only support query sql
|
||||
/// Query plan
|
||||
Query(QueryPlan),
|
||||
}
|
||||
|
||||
impl Plan {
|
||||
pub fn schema(&self) -> SchemaRef {
|
||||
match self {
|
||||
Self::Query(p) => SchemaRef::from(p.df_plan.schema().as_ref().to_owned()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct QueryPlan {
|
||||
pub df_plan: DFPlan,
|
||||
pub is_tag_scan: bool,
|
||||
}
|
||||
|
||||
impl QueryPlan {
|
||||
pub fn is_explain(&self) -> bool {
|
||||
matches!(self.df_plan, DFPlan::Explain(_) | DFPlan::Analyze(_))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait LogicalPlanner {
|
||||
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan>;
|
||||
}
|
||||
41
s3select/api/src/query/mod.rs
Normal file
41
s3select/api/src/query/mod.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use s3s::dto::SelectObjectContentInput;
|
||||
|
||||
pub mod analyzer;
|
||||
pub mod ast;
|
||||
pub mod datasource;
|
||||
pub mod dispatcher;
|
||||
pub mod execution;
|
||||
pub mod function;
|
||||
pub mod logical_planner;
|
||||
pub mod optimizer;
|
||||
pub mod parser;
|
||||
pub mod physical_planner;
|
||||
pub mod scheduler;
|
||||
pub mod session;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Context {
|
||||
// maybe we need transfer some info?
|
||||
pub input: SelectObjectContentInput,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Query {
|
||||
context: Context,
|
||||
content: String,
|
||||
}
|
||||
|
||||
impl Query {
|
||||
#[inline(always)]
|
||||
pub fn new(context: Context, content: String) -> Self {
|
||||
Self { context, content }
|
||||
}
|
||||
|
||||
pub fn context(&self) -> &Context {
|
||||
&self.context
|
||||
}
|
||||
|
||||
pub fn content(&self) -> &str {
|
||||
self.content.as_str()
|
||||
}
|
||||
}
|
||||
15
s3select/api/src/query/optimizer.rs
Normal file
15
s3select/api/src/query/optimizer.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
|
||||
use super::logical_planner::QueryPlan;
|
||||
use super::session::SessionCtx;
|
||||
use crate::QueryResult;
|
||||
|
||||
pub type OptimizerRef = Arc<dyn Optimizer + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Optimizer {
|
||||
async fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>>;
|
||||
}
|
||||
8
s3select/api/src/query/parser.rs
Normal file
8
s3select/api/src/query/parser.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use super::ast::ExtStatement;
|
||||
use crate::QueryResult;
|
||||
|
||||
pub trait Parser {
|
||||
fn parse(&self, sql: &str) -> QueryResult<VecDeque<ExtStatement>>;
|
||||
}
|
||||
21
s3select/api/src/query/physical_planner.rs
Normal file
21
s3select/api/src/query/physical_planner.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use datafusion::logical_expr::LogicalPlan;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::physical_planner::ExtensionPlanner;
|
||||
|
||||
use super::session::SessionCtx;
|
||||
use crate::QueryResult;
|
||||
|
||||
#[async_trait]
|
||||
pub trait PhysicalPlanner {
|
||||
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
|
||||
async fn create_physical_plan(
|
||||
&self,
|
||||
logical_plan: &LogicalPlan,
|
||||
session_state: &SessionCtx,
|
||||
) -> QueryResult<Arc<dyn ExecutionPlan>>;
|
||||
|
||||
fn inject_physical_transform_rule(&mut self, rule: Arc<dyn ExtensionPlanner + Send + Sync>);
|
||||
}
|
||||
32
s3select/api/src/query/scheduler.rs
Normal file
32
s3select/api/src/query/scheduler.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use datafusion::common::Result;
|
||||
use datafusion::execution::context::TaskContext;
|
||||
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
|
||||
|
||||
pub type SchedulerRef = Arc<dyn Scheduler + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Scheduler {
|
||||
/// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
|
||||
///
|
||||
/// Returns a [`ExecutionResults`] that can be used to receive results as they are produced,
|
||||
/// as a [`futures::Stream`] of [`RecordBatch`]
|
||||
async fn schedule(&self, plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> Result<ExecutionResults>;
|
||||
}
|
||||
|
||||
pub struct ExecutionResults {
|
||||
stream: SendableRecordBatchStream,
|
||||
}
|
||||
|
||||
impl ExecutionResults {
|
||||
pub fn new(stream: SendableRecordBatchStream) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
|
||||
/// Returns a [`SendableRecordBatchStream`] of this execution
|
||||
pub fn stream(self) -> SendableRecordBatchStream {
|
||||
self.stream
|
||||
}
|
||||
}
|
||||
86
s3select/api/src/query/session.rs
Normal file
86
s3select/api/src/query/session.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use datafusion::{
|
||||
execution::{context::SessionState, runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
|
||||
prelude::SessionContext,
|
||||
};
|
||||
use object_store::{memory::InMemory, path::Path, ObjectStore};
|
||||
use tracing::error;
|
||||
|
||||
use crate::{object_store::EcObjectStore, QueryError, QueryResult};
|
||||
|
||||
use super::Context;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SessionCtx {
|
||||
_desc: Arc<SessionCtxDesc>,
|
||||
inner: SessionState,
|
||||
}
|
||||
|
||||
impl SessionCtx {
|
||||
pub fn inner(&self) -> &SessionState {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SessionCtxDesc {
|
||||
// maybe we need some info
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SessionCtxFactory {
|
||||
pub is_test: bool,
|
||||
}
|
||||
|
||||
impl SessionCtxFactory {
|
||||
pub async fn create_session_ctx(&self, context: &Context) -> QueryResult<SessionCtx> {
|
||||
let df_session_ctx = self.build_df_session_context(context).await?;
|
||||
|
||||
Ok(SessionCtx {
|
||||
_desc: Arc::new(SessionCtxDesc {}),
|
||||
inner: df_session_ctx.state(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn build_df_session_context(&self, context: &Context) -> QueryResult<SessionContext> {
|
||||
let path = format!("s3://{}", context.input.bucket);
|
||||
let store_url = url::Url::parse(&path).unwrap();
|
||||
let rt = RuntimeEnvBuilder::new().build()?;
|
||||
let df_session_state = SessionStateBuilder::new()
|
||||
.with_runtime_env(Arc::new(rt))
|
||||
.with_default_features();
|
||||
|
||||
let df_session_state = if self.is_test {
|
||||
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let data = b"id,name,age,department,salary
|
||||
1,Alice,25,HR,5000
|
||||
2,Bob,30,IT,6000
|
||||
3,Charlie,35,Finance,7000
|
||||
4,Diana,22,Marketing,4500
|
||||
5,Eve,28,IT,5500
|
||||
6,Frank,40,Finance,8000
|
||||
7,Grace,26,HR,5200
|
||||
8,Henry,32,IT,6200
|
||||
9,Ivy,24,Marketing,4800
|
||||
10,Jack,38,Finance,7500";
|
||||
let data_bytes = Bytes::from(data.to_vec());
|
||||
let path = Path::from(context.input.key.clone());
|
||||
store.put(&path, data_bytes.into()).await.map_err(|e| {
|
||||
error!("put data into memory failed: {}", e.to_string());
|
||||
QueryError::StoreError { e: e.to_string() }
|
||||
})?;
|
||||
|
||||
df_session_state.with_object_store(&store_url, Arc::new(store)).build()
|
||||
} else {
|
||||
let store =
|
||||
EcObjectStore::new(context.input.clone()).map_err(|_| QueryError::NotImplemented { err: String::new() })?;
|
||||
df_session_state.with_object_store(&store_url, Arc::new(store)).build()
|
||||
};
|
||||
|
||||
let df_session_ctx = SessionContext::new_with_state(df_session_state);
|
||||
|
||||
Ok(df_session_ctx)
|
||||
}
|
||||
}
|
||||
41
s3select/api/src/server/dbms.rs
Normal file
41
s3select/api/src/server/dbms.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::{
|
||||
query::{
|
||||
execution::{Output, QueryStateMachineRef},
|
||||
logical_planner::Plan,
|
||||
Query,
|
||||
},
|
||||
QueryResult,
|
||||
};
|
||||
|
||||
pub struct QueryHandle {
|
||||
query: Query,
|
||||
result: Output,
|
||||
}
|
||||
|
||||
impl QueryHandle {
|
||||
pub fn new(query: Query, result: Output) -> Self {
|
||||
Self { query, result }
|
||||
}
|
||||
|
||||
pub fn query(&self) -> &Query {
|
||||
&self.query
|
||||
}
|
||||
|
||||
pub fn result(self) -> Output {
|
||||
self.result
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DatabaseManagerSystem {
|
||||
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle>;
|
||||
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef>;
|
||||
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>>;
|
||||
async fn execute_logical_plan(
|
||||
&self,
|
||||
logical_plan: Plan,
|
||||
query_state_machine: QueryStateMachineRef,
|
||||
) -> QueryResult<QueryHandle>;
|
||||
}
|
||||
1
s3select/api/src/server/mod.rs
Normal file
1
s3select/api/src/server/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod dbms;
|
||||
17
s3select/query/Cargo.toml
Normal file
17
s3select/query/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "query"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[dependencies]
|
||||
api = { path = "../api" }
|
||||
async-recursion = { workspace = true }
|
||||
async-trait.workspace = true
|
||||
datafusion = { workspace = true }
|
||||
derive_builder = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
parking_lot = { version = "0.12.1" }
|
||||
s3s.workspace = true
|
||||
snafu = { workspace = true, features = ["backtrace"] }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
1
s3select/query/src/data_source/mod.rs
Normal file
1
s3select/query/src/data_source/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod table_source;
|
||||
138
s3select/query/src/data_source/table_source.rs
Normal file
138
s3select/query/src/data_source/table_source.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use std::write;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use datafusion::arrow::datatypes::SchemaRef;
|
||||
use datafusion::common::Result as DFResult;
|
||||
use datafusion::datasource::listing::ListingTable;
|
||||
use datafusion::datasource::{provider_as_source, TableProvider};
|
||||
use datafusion::error::DataFusionError;
|
||||
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableSource};
|
||||
use datafusion::prelude::Expr;
|
||||
use datafusion::sql::TableReference;
|
||||
use tracing::debug;
|
||||
|
||||
pub const TEMP_LOCATION_TABLE_NAME: &str = "external_location_table";
|
||||
|
||||
pub struct TableSourceAdapter {
|
||||
database_name: String,
|
||||
table_name: String,
|
||||
table_handle: TableHandle,
|
||||
|
||||
plan: LogicalPlan,
|
||||
}
|
||||
|
||||
impl TableSourceAdapter {
|
||||
pub fn try_new(
|
||||
table_ref: impl Into<TableReference>,
|
||||
table_name: impl Into<String>,
|
||||
table_handle: impl Into<TableHandle>,
|
||||
) -> Result<Self, DataFusionError> {
|
||||
let table_name: String = table_name.into();
|
||||
|
||||
let table_handle = table_handle.into();
|
||||
let plan = match &table_handle {
|
||||
// TableScan
|
||||
TableHandle::External(t) => {
|
||||
let table_source = provider_as_source(t.clone());
|
||||
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
|
||||
}
|
||||
// TableScan
|
||||
TableHandle::TableProvider(t) => {
|
||||
let table_source = provider_as_source(t.clone());
|
||||
if let Some(plan) = table_source.get_logical_plan() {
|
||||
LogicalPlanBuilder::from(plan.into_owned()).build()?
|
||||
} else {
|
||||
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Table source logical plan node of {}:\n{}", table_name, plan.display_indent_schema());
|
||||
|
||||
Ok(Self {
|
||||
database_name: "default_db".to_string(),
|
||||
table_name,
|
||||
table_handle,
|
||||
plan,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn database_name(&self) -> &str {
|
||||
&self.database_name
|
||||
}
|
||||
|
||||
pub fn table_name(&self) -> &str {
|
||||
&self.table_name
|
||||
}
|
||||
|
||||
pub fn table_handle(&self) -> &TableHandle {
|
||||
&self.table_handle
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableSource for TableSourceAdapter {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.table_handle.schema()
|
||||
}
|
||||
|
||||
fn supports_filters_pushdown(&self, filter: &[&Expr]) -> DFResult<Vec<TableProviderFilterPushDown>> {
|
||||
self.table_handle.supports_filters_pushdown(filter)
|
||||
}
|
||||
|
||||
/// Called by [`InlineTableScan`]
|
||||
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
|
||||
Some(Cow::Owned(self.plan.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum TableHandle {
|
||||
TableProvider(Arc<dyn TableProvider>),
|
||||
External(Arc<ListingTable>),
|
||||
}
|
||||
|
||||
impl TableHandle {
|
||||
pub fn schema(&self) -> SchemaRef {
|
||||
match self {
|
||||
Self::External(t) => t.schema(),
|
||||
Self::TableProvider(t) => t.schema(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn supports_filters_pushdown(&self, filter: &[&Expr]) -> DFResult<Vec<TableProviderFilterPushDown>> {
|
||||
match self {
|
||||
Self::External(t) => t.supports_filters_pushdown(filter),
|
||||
Self::TableProvider(t) => t.supports_filters_pushdown(filter),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Arc<dyn TableProvider>> for TableHandle {
|
||||
fn from(value: Arc<dyn TableProvider>) -> Self {
|
||||
TableHandle::TableProvider(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Arc<ListingTable>> for TableHandle {
|
||||
fn from(value: Arc<ListingTable>) -> Self {
|
||||
TableHandle::External(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for TableHandle {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::External(e) => write!(f, "External({:?})", e.table_paths()),
|
||||
Self::TableProvider(_) => write!(f, "TableProvider"),
|
||||
}
|
||||
}
|
||||
}
|
||||
271
s3select/query/src/dispatcher/manager.rs
Normal file
271
s3select/query/src/dispatcher/manager.rs
Normal file
@@ -0,0 +1,271 @@
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use api::{
|
||||
query::{
|
||||
ast::ExtStatement,
|
||||
dispatcher::QueryDispatcher,
|
||||
execution::{Output, QueryStateMachine},
|
||||
function::FuncMetaManagerRef,
|
||||
logical_planner::{LogicalPlanner, Plan},
|
||||
parser::Parser,
|
||||
session::{SessionCtx, SessionCtxFactory},
|
||||
Query,
|
||||
},
|
||||
QueryError, QueryResult,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use datafusion::{
|
||||
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
|
||||
config::CsvOptions,
|
||||
datasource::{
|
||||
file_format::{csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat},
|
||||
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
|
||||
},
|
||||
error::Result as DFResult,
|
||||
execution::{RecordBatchStream, SendableRecordBatchStream},
|
||||
};
|
||||
use futures::{Stream, StreamExt};
|
||||
use s3s::dto::SelectObjectContentInput;
|
||||
|
||||
use crate::{
|
||||
execution::factory::QueryExecutionFactoryRef,
|
||||
metadata::{base_table::BaseTableProvider, ContextProviderExtension, MetadataProvider, TableHandleProviderRef},
|
||||
sql::logical::planner::DefaultLogicalPlanner,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SimpleQueryDispatcher {
|
||||
input: SelectObjectContentInput,
|
||||
// client for default tenant
|
||||
_default_table_provider: TableHandleProviderRef,
|
||||
session_factory: Arc<SessionCtxFactory>,
|
||||
// parser
|
||||
parser: Arc<dyn Parser + Send + Sync>,
|
||||
// get query execution factory
|
||||
query_execution_factory: QueryExecutionFactoryRef,
|
||||
func_manager: FuncMetaManagerRef,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryDispatcher for SimpleQueryDispatcher {
|
||||
async fn execute_query(&self, query: &Query) -> QueryResult<Output> {
|
||||
let query_state_machine = { self.build_query_state_machine(query.clone()).await? };
|
||||
|
||||
let logical_plan = self.build_logical_plan(query_state_machine.clone()).await?;
|
||||
let logical_plan = match logical_plan {
|
||||
Some(plan) => plan,
|
||||
None => return Ok(Output::Nil(())),
|
||||
};
|
||||
let result = self.execute_logical_plan(logical_plan, query_state_machine).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn build_logical_plan(&self, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Option<Plan>> {
|
||||
let session = &query_state_machine.session;
|
||||
let query = &query_state_machine.query;
|
||||
|
||||
let scheme_provider = self.build_scheme_provider(session).await?;
|
||||
|
||||
let logical_planner = DefaultLogicalPlanner::new(&scheme_provider);
|
||||
|
||||
let statements = self.parser.parse(query.content())?;
|
||||
|
||||
// not allow multi statement
|
||||
if statements.len() > 1 {
|
||||
return Err(QueryError::MultiStatement {
|
||||
num: statements.len(),
|
||||
sql: query_state_machine.query.content().to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stmt = match statements.front() {
|
||||
Some(stmt) => stmt.clone(),
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let logical_plan = self
|
||||
.statement_to_logical_plan(stmt, &logical_planner, query_state_machine)
|
||||
.await?;
|
||||
Ok(Some(logical_plan))
|
||||
}
|
||||
|
||||
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output> {
|
||||
self.execute_logical_plan(logical_plan, query_state_machine).await
|
||||
}
|
||||
|
||||
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>> {
|
||||
let session = self.session_factory.create_session_ctx(query.context()).await?;
|
||||
|
||||
let query_state_machine = Arc::new(QueryStateMachine::begin(query, session));
|
||||
Ok(query_state_machine)
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleQueryDispatcher {
|
||||
async fn statement_to_logical_plan<S: ContextProviderExtension + Send + Sync>(
|
||||
&self,
|
||||
stmt: ExtStatement,
|
||||
logical_planner: &DefaultLogicalPlanner<'_, S>,
|
||||
query_state_machine: Arc<QueryStateMachine>,
|
||||
) -> QueryResult<Plan> {
|
||||
// begin analyze
|
||||
query_state_machine.begin_analyze();
|
||||
let logical_plan = logical_planner
|
||||
.create_logical_plan(stmt, &query_state_machine.session)
|
||||
.await?;
|
||||
query_state_machine.end_analyze();
|
||||
|
||||
Ok(logical_plan)
|
||||
}
|
||||
|
||||
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output> {
|
||||
let execution = self
|
||||
.query_execution_factory
|
||||
.create_query_execution(logical_plan, query_state_machine.clone())
|
||||
.await?;
|
||||
|
||||
match execution.start().await {
|
||||
Ok(Output::StreamData(stream)) => Ok(Output::StreamData(Box::pin(TrackedRecordBatchStream { inner: stream }))),
|
||||
Ok(nil @ Output::Nil(_)) => Ok(nil),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_scheme_provider(&self, session: &SessionCtx) -> QueryResult<MetadataProvider> {
|
||||
let path = format!("s3://{}/{}", self.input.bucket, self.input.key);
|
||||
let table_path = ListingTableUrl::parse(path)?;
|
||||
let listing_options = if self.input.request.input_serialization.csv.is_some() {
|
||||
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false));
|
||||
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
|
||||
} else if self.input.request.input_serialization.parquet.is_some() {
|
||||
let file_format = ParquetFormat::new();
|
||||
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
|
||||
} else if self.input.request.input_serialization.json.is_some() {
|
||||
let file_format = JsonFormat::default();
|
||||
ListingOptions::new(Arc::new(file_format)).with_file_extension(".json")
|
||||
} else {
|
||||
return Err(QueryError::NotImplemented {
|
||||
err: "not support this file type".to_string(),
|
||||
});
|
||||
};
|
||||
|
||||
let resolve_schema = listing_options.infer_schema(session.inner(), &table_path).await?;
|
||||
let config = ListingTableConfig::new(table_path)
|
||||
.with_listing_options(listing_options)
|
||||
.with_schema(resolve_schema);
|
||||
let provider = Arc::new(ListingTable::try_new(config)?);
|
||||
let current_session_table_provider = self.build_table_handle_provider()?;
|
||||
let metadata_provider =
|
||||
MetadataProvider::new(provider, current_session_table_provider, self.func_manager.clone(), session.clone());
|
||||
|
||||
Ok(metadata_provider)
|
||||
}
|
||||
|
||||
fn build_table_handle_provider(&self) -> QueryResult<TableHandleProviderRef> {
|
||||
let current_session_table_provider: Arc<BaseTableProvider> = Arc::new(BaseTableProvider::default());
|
||||
|
||||
Ok(current_session_table_provider)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TrackedRecordBatchStream {
|
||||
inner: SendableRecordBatchStream,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for TrackedRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.inner.schema()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for TrackedRecordBatchStream {
|
||||
type Item = DFResult<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.inner.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct SimpleQueryDispatcherBuilder {
|
||||
input: Option<SelectObjectContentInput>,
|
||||
default_table_provider: Option<TableHandleProviderRef>,
|
||||
session_factory: Option<Arc<SessionCtxFactory>>,
|
||||
parser: Option<Arc<dyn Parser + Send + Sync>>,
|
||||
|
||||
query_execution_factory: Option<QueryExecutionFactoryRef>,
|
||||
|
||||
func_manager: Option<FuncMetaManagerRef>,
|
||||
}
|
||||
|
||||
impl SimpleQueryDispatcherBuilder {
|
||||
pub fn with_input(mut self, input: SelectObjectContentInput) -> Self {
|
||||
self.input = Some(input);
|
||||
self
|
||||
}
|
||||
pub fn with_default_table_provider(mut self, default_table_provider: TableHandleProviderRef) -> Self {
|
||||
self.default_table_provider = Some(default_table_provider);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_session_factory(mut self, session_factory: Arc<SessionCtxFactory>) -> Self {
|
||||
self.session_factory = Some(session_factory);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_parser(mut self, parser: Arc<dyn Parser + Send + Sync>) -> Self {
|
||||
self.parser = Some(parser);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_query_execution_factory(mut self, query_execution_factory: QueryExecutionFactoryRef) -> Self {
|
||||
self.query_execution_factory = Some(query_execution_factory);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_func_manager(mut self, func_manager: FuncMetaManagerRef) -> Self {
|
||||
self.func_manager = Some(func_manager);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> QueryResult<Arc<SimpleQueryDispatcher>> {
|
||||
let input = self.input.ok_or_else(|| QueryError::BuildQueryDispatcher {
|
||||
err: "lost of input".to_string(),
|
||||
})?;
|
||||
|
||||
let session_factory = self.session_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
|
||||
err: "lost of session_factory".to_string(),
|
||||
})?;
|
||||
|
||||
let parser = self.parser.ok_or_else(|| QueryError::BuildQueryDispatcher {
|
||||
err: "lost of parser".to_string(),
|
||||
})?;
|
||||
|
||||
let query_execution_factory = self.query_execution_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
|
||||
err: "lost of query_execution_factory".to_string(),
|
||||
})?;
|
||||
|
||||
let func_manager = self.func_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
|
||||
err: "lost of func_manager".to_string(),
|
||||
})?;
|
||||
|
||||
let default_table_provider = self.default_table_provider.ok_or_else(|| QueryError::BuildQueryDispatcher {
|
||||
err: "lost of default_table_provider".to_string(),
|
||||
})?;
|
||||
|
||||
let dispatcher = Arc::new(SimpleQueryDispatcher {
|
||||
input,
|
||||
_default_table_provider: default_table_provider,
|
||||
session_factory,
|
||||
parser,
|
||||
query_execution_factory,
|
||||
func_manager,
|
||||
});
|
||||
|
||||
Ok(dispatcher)
|
||||
}
|
||||
}
|
||||
1
s3select/query/src/dispatcher/mod.rs
Normal file
1
s3select/query/src/dispatcher/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod manager;
|
||||
46
s3select/query/src/execution/factory.rs
Normal file
46
s3select/query/src/execution/factory.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::{
|
||||
query::{
|
||||
execution::{QueryExecutionFactory, QueryExecutionRef, QueryStateMachineRef},
|
||||
logical_planner::Plan,
|
||||
optimizer::Optimizer,
|
||||
scheduler::SchedulerRef,
|
||||
},
|
||||
QueryError,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
|
||||
use super::query::SqlQueryExecution;
|
||||
|
||||
pub type QueryExecutionFactoryRef = Arc<dyn QueryExecutionFactory + Send + Sync>;
|
||||
|
||||
pub struct SqlQueryExecutionFactory {
|
||||
optimizer: Arc<dyn Optimizer + Send + Sync>,
|
||||
scheduler: SchedulerRef,
|
||||
}
|
||||
|
||||
impl SqlQueryExecutionFactory {
|
||||
#[inline(always)]
|
||||
pub fn new(optimizer: Arc<dyn Optimizer + Send + Sync>, scheduler: SchedulerRef) -> Self {
|
||||
Self { optimizer, scheduler }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryExecutionFactory for SqlQueryExecutionFactory {
|
||||
async fn create_query_execution(
|
||||
&self,
|
||||
plan: Plan,
|
||||
state_machine: QueryStateMachineRef,
|
||||
) -> Result<QueryExecutionRef, QueryError> {
|
||||
match plan {
|
||||
Plan::Query(query_plan) => Ok(Arc::new(SqlQueryExecution::new(
|
||||
state_machine,
|
||||
query_plan,
|
||||
self.optimizer.clone(),
|
||||
self.scheduler.clone(),
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
3
s3select/query/src/execution/mod.rs
Normal file
3
s3select/query/src/execution/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod factory;
|
||||
pub mod query;
|
||||
pub mod scheduler;
|
||||
92
s3select/query/src/execution/query.rs
Normal file
92
s3select/query/src/execution/query.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::query::execution::{Output, QueryExecution, QueryStateMachineRef};
|
||||
use api::query::logical_planner::QueryPlan;
|
||||
use api::query::optimizer::Optimizer;
|
||||
use api::query::scheduler::SchedulerRef;
|
||||
use api::{QueryError, QueryResult};
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::AbortHandle;
|
||||
use parking_lot::Mutex;
|
||||
use tracing::debug;
|
||||
|
||||
pub struct SqlQueryExecution {
|
||||
query_state_machine: QueryStateMachineRef,
|
||||
plan: QueryPlan,
|
||||
optimizer: Arc<dyn Optimizer + Send + Sync>,
|
||||
scheduler: SchedulerRef,
|
||||
|
||||
abort_handle: Mutex<Option<AbortHandle>>,
|
||||
}
|
||||
|
||||
impl SqlQueryExecution {
|
||||
pub fn new(
|
||||
query_state_machine: QueryStateMachineRef,
|
||||
plan: QueryPlan,
|
||||
optimizer: Arc<dyn Optimizer + Send + Sync>,
|
||||
scheduler: SchedulerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
query_state_machine,
|
||||
plan,
|
||||
optimizer,
|
||||
scheduler,
|
||||
abort_handle: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(&self) -> QueryResult<Output> {
|
||||
// begin optimize
|
||||
self.query_state_machine.begin_optimize();
|
||||
let physical_plan = self.optimizer.optimize(&self.plan, &self.query_state_machine.session).await?;
|
||||
self.query_state_machine.end_optimize();
|
||||
|
||||
// begin schedule
|
||||
self.query_state_machine.begin_schedule();
|
||||
let stream = self
|
||||
.scheduler
|
||||
.schedule(physical_plan.clone(), self.query_state_machine.session.inner().task_ctx())
|
||||
.await?
|
||||
.stream();
|
||||
|
||||
debug!("Success build result stream.");
|
||||
self.query_state_machine.end_schedule();
|
||||
|
||||
Ok(Output::StreamData(stream))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryExecution for SqlQueryExecution {
|
||||
async fn start(&self) -> QueryResult<Output> {
|
||||
let (task, abort_handle) = futures::future::abortable(self.start());
|
||||
|
||||
{
|
||||
*self.abort_handle.lock() = Some(abort_handle);
|
||||
}
|
||||
|
||||
task.await.map_err(|_| QueryError::Cancel)?
|
||||
}
|
||||
|
||||
fn cancel(&self) -> QueryResult<()> {
|
||||
debug!(
|
||||
"cancel sql query execution: sql: {}, state: {:?}",
|
||||
self.query_state_machine.query.content(),
|
||||
self.query_state_machine.state()
|
||||
);
|
||||
|
||||
// change state
|
||||
self.query_state_machine.cancel();
|
||||
// stop future task
|
||||
if let Some(e) = self.abort_handle.lock().as_ref() {
|
||||
e.abort()
|
||||
};
|
||||
|
||||
debug!(
|
||||
"canceled sql query execution: sql: {}, state: {:?}",
|
||||
self.query_state_machine.query.content(),
|
||||
self.query_state_machine.state()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
22
s3select/query/src/execution/scheduler/local.rs
Normal file
22
s3select/query/src/execution/scheduler/local.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::query::scheduler::{ExecutionResults, Scheduler};
|
||||
use async_trait::async_trait;
|
||||
use datafusion::error::DataFusionError;
|
||||
use datafusion::execution::context::TaskContext;
|
||||
use datafusion::physical_plan::{execute_stream, ExecutionPlan};
|
||||
|
||||
pub struct LocalScheduler {}
|
||||
|
||||
#[async_trait]
|
||||
impl Scheduler for LocalScheduler {
|
||||
async fn schedule(
|
||||
&self,
|
||||
plan: Arc<dyn ExecutionPlan>,
|
||||
context: Arc<TaskContext>,
|
||||
) -> Result<ExecutionResults, DataFusionError> {
|
||||
let stream = execute_stream(plan, context)?;
|
||||
|
||||
Ok(ExecutionResults::new(stream))
|
||||
}
|
||||
}
|
||||
1
s3select/query/src/execution/scheduler/mod.rs
Normal file
1
s3select/query/src/execution/scheduler/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod local;
|
||||
1
s3select/query/src/function/mod.rs
Normal file
1
s3select/query/src/function/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod simple_func_manager;
|
||||
63
s3select/query/src/function/simple_func_manager.rs
Normal file
63
s3select/query/src/function/simple_func_manager.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::query::function::FunctionMetadataManager;
|
||||
use api::{QueryError, QueryResult};
|
||||
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
|
||||
|
||||
pub type SimpleFunctionMetadataManagerRef = Arc<SimpleFunctionMetadataManager>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SimpleFunctionMetadataManager {
|
||||
/// Scalar functions that are registered with the context
|
||||
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
|
||||
/// Aggregate functions registered in the context
|
||||
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
|
||||
/// Window functions registered in the context
|
||||
pub window_functions: HashMap<String, Arc<WindowUDF>>,
|
||||
}
|
||||
|
||||
impl FunctionMetadataManager for SimpleFunctionMetadataManager {
|
||||
fn register_udf(&mut self, f: ScalarUDF) -> QueryResult<()> {
|
||||
self.scalar_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn register_udaf(&mut self, f: AggregateUDF) -> QueryResult<()> {
|
||||
self.aggregate_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn register_udwf(&mut self, f: WindowUDF) -> QueryResult<()> {
|
||||
self.window_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>> {
|
||||
let result = self.scalar_functions.get(&name.to_uppercase());
|
||||
|
||||
result
|
||||
.cloned()
|
||||
.ok_or_else(|| QueryError::FunctionExists { name: name.to_string() })
|
||||
}
|
||||
|
||||
fn udaf(&self, name: &str) -> QueryResult<Arc<AggregateUDF>> {
|
||||
let result = self.aggregate_functions.get(&name.to_uppercase());
|
||||
|
||||
result
|
||||
.cloned()
|
||||
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
|
||||
}
|
||||
|
||||
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>> {
|
||||
let result = self.window_functions.get(&name.to_uppercase());
|
||||
|
||||
result
|
||||
.cloned()
|
||||
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
|
||||
}
|
||||
|
||||
fn udfs(&self) -> HashSet<String> {
|
||||
self.scalar_functions.keys().cloned().collect()
|
||||
}
|
||||
}
|
||||
164
s3select/query/src/instance.rs
Normal file
164
s3select/query/src/instance.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::{
|
||||
query::{
|
||||
dispatcher::QueryDispatcher, execution::QueryStateMachineRef, logical_planner::Plan, session::SessionCtxFactory, Query,
|
||||
},
|
||||
server::dbms::{DatabaseManagerSystem, QueryHandle},
|
||||
QueryResult,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use derive_builder::Builder;
|
||||
use s3s::dto::SelectObjectContentInput;
|
||||
|
||||
use crate::{
|
||||
dispatcher::manager::SimpleQueryDispatcherBuilder,
|
||||
execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler},
|
||||
function::simple_func_manager::SimpleFunctionMetadataManager,
|
||||
metadata::base_table::BaseTableProvider,
|
||||
sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser},
|
||||
};
|
||||
|
||||
#[derive(Builder)]
|
||||
pub struct RustFSms<D: QueryDispatcher> {
|
||||
// query dispatcher & query execution
|
||||
query_dispatcher: Arc<D>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<D> DatabaseManagerSystem for RustFSms<D>
|
||||
where
|
||||
D: QueryDispatcher,
|
||||
{
|
||||
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle> {
|
||||
let result = self.query_dispatcher.execute_query(query).await?;
|
||||
|
||||
Ok(QueryHandle::new(query.clone(), result))
|
||||
}
|
||||
|
||||
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef> {
|
||||
let query_state_machine = self.query_dispatcher.build_query_state_machine(query).await?;
|
||||
|
||||
Ok(query_state_machine)
|
||||
}
|
||||
|
||||
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>> {
|
||||
let logical_plan = self.query_dispatcher.build_logical_plan(query_state_machine).await?;
|
||||
|
||||
Ok(logical_plan)
|
||||
}
|
||||
|
||||
async fn execute_logical_plan(
|
||||
&self,
|
||||
logical_plan: Plan,
|
||||
query_state_machine: QueryStateMachineRef,
|
||||
) -> QueryResult<QueryHandle> {
|
||||
let query = query_state_machine.query.clone();
|
||||
let result = self
|
||||
.query_dispatcher
|
||||
.execute_logical_plan(logical_plan, query_state_machine)
|
||||
.await?;
|
||||
|
||||
Ok(QueryHandle::new(query.clone(), result))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn make_rustfsms(input: SelectObjectContentInput, is_test: bool) -> QueryResult<impl DatabaseManagerSystem> {
|
||||
// init Function Manager, we can define some UDF if need
|
||||
let func_manager = SimpleFunctionMetadataManager::default();
|
||||
// TODO session config need load global system config
|
||||
let session_factory = Arc::new(SessionCtxFactory { is_test });
|
||||
let parser = Arc::new(DefaultParser::default());
|
||||
let optimizer = Arc::new(CascadeOptimizerBuilder::default().build());
|
||||
// TODO wrap, and num_threads configurable
|
||||
let scheduler = Arc::new(LocalScheduler {});
|
||||
|
||||
let query_execution_factory = Arc::new(SqlQueryExecutionFactory::new(optimizer, scheduler));
|
||||
|
||||
let default_table_provider = Arc::new(BaseTableProvider::default());
|
||||
|
||||
let query_dispatcher = SimpleQueryDispatcherBuilder::default()
|
||||
.with_input(input)
|
||||
.with_func_manager(Arc::new(func_manager))
|
||||
.with_default_table_provider(default_table_provider)
|
||||
.with_session_factory(session_factory)
|
||||
.with_parser(parser)
|
||||
.with_query_execution_factory(query_execution_factory)
|
||||
.build()?;
|
||||
|
||||
let mut builder = RustFSmsBuilder::default();
|
||||
|
||||
let db_server = builder.query_dispatcher(query_dispatcher).build().expect("build db server");
|
||||
|
||||
Ok(db_server)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::{
|
||||
query::{Context, Query},
|
||||
server::dbms::DatabaseManagerSystem,
|
||||
};
|
||||
use datafusion::{arrow::util::pretty, assert_batches_eq};
|
||||
use s3s::dto::{
|
||||
CSVInput, CSVOutput, ExpressionType, InputSerialization, OutputSerialization, SelectObjectContentInput,
|
||||
SelectObjectContentRequest,
|
||||
};
|
||||
|
||||
use crate::instance::make_rustfsms;
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn test_simple_sql() {
|
||||
let sql = "select * from S3Object";
|
||||
let input = SelectObjectContentInput {
|
||||
bucket: "dandan".to_string(),
|
||||
expected_bucket_owner: None,
|
||||
key: "test.csv".to_string(),
|
||||
sse_customer_algorithm: None,
|
||||
sse_customer_key: None,
|
||||
sse_customer_key_md5: None,
|
||||
request: SelectObjectContentRequest {
|
||||
expression: sql.to_string(),
|
||||
expression_type: ExpressionType::from_static("SQL"),
|
||||
input_serialization: InputSerialization {
|
||||
csv: Some(CSVInput::default()),
|
||||
..Default::default()
|
||||
},
|
||||
output_serialization: OutputSerialization {
|
||||
csv: Some(CSVOutput::default()),
|
||||
..Default::default()
|
||||
},
|
||||
request_progress: None,
|
||||
scan_range: None,
|
||||
},
|
||||
};
|
||||
let db = make_rustfsms(input.clone(), true).await.unwrap();
|
||||
let query = Query::new(Context { input }, sql.to_string());
|
||||
|
||||
let result = db.execute(&query).await.unwrap();
|
||||
|
||||
let results = result.result().chunk_result().await.unwrap().to_vec();
|
||||
|
||||
let expected = [
|
||||
"+----------------+----------+----------+------------+----------+",
|
||||
"| column_1 | column_2 | column_3 | column_4 | column_5 |",
|
||||
"+----------------+----------+----------+------------+----------+",
|
||||
"| id | name | age | department | salary |",
|
||||
"| 1 | Alice | 25 | HR | 5000 |",
|
||||
"| 2 | Bob | 30 | IT | 6000 |",
|
||||
"| 3 | Charlie | 35 | Finance | 7000 |",
|
||||
"| 4 | Diana | 22 | Marketing | 4500 |",
|
||||
"| 5 | Eve | 28 | IT | 5500 |",
|
||||
"| 6 | Frank | 40 | Finance | 8000 |",
|
||||
"| 7 | Grace | 26 | HR | 5200 |",
|
||||
"| 8 | Henry | 32 | IT | 6200 |",
|
||||
"| 9 | Ivy | 24 | Marketing | 4800 |",
|
||||
"| 10 | Jack | 38 | Finance | 7500 |",
|
||||
"+----------------+----------+----------+------------+----------+",
|
||||
];
|
||||
|
||||
assert_batches_eq!(expected, &results);
|
||||
pretty::print_batches(&results).unwrap();
|
||||
}
|
||||
}
|
||||
7
s3select/query/src/lib.rs
Normal file
7
s3select/query/src/lib.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub mod data_source;
|
||||
pub mod dispatcher;
|
||||
pub mod execution;
|
||||
pub mod function;
|
||||
pub mod instance;
|
||||
pub mod metadata;
|
||||
pub mod sql;
|
||||
17
s3select/query/src/metadata/base_table.rs
Normal file
17
s3select/query/src/metadata/base_table.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::common::Result as DFResult;
|
||||
use datafusion::datasource::listing::ListingTable;
|
||||
|
||||
use crate::data_source::table_source::TableHandle;
|
||||
|
||||
use super::TableHandleProvider;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct BaseTableProvider {}
|
||||
|
||||
impl TableHandleProvider for BaseTableProvider {
|
||||
fn build_table_handle(&self, provider: Arc<ListingTable>) -> DFResult<TableHandle> {
|
||||
Ok(TableHandle::External(provider))
|
||||
}
|
||||
}
|
||||
126
s3select/query/src/metadata/mod.rs
Normal file
126
s3select/query/src/metadata/mod.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::query::{function::FuncMetaManagerRef, session::SessionCtx};
|
||||
use async_trait::async_trait;
|
||||
use datafusion::arrow::datatypes::DataType;
|
||||
use datafusion::common::Result as DFResult;
|
||||
use datafusion::datasource::listing::ListingTable;
|
||||
use datafusion::logical_expr::var_provider::is_system_variables;
|
||||
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
|
||||
use datafusion::variable::VarType;
|
||||
use datafusion::{
|
||||
config::ConfigOptions,
|
||||
sql::{planner::ContextProvider, TableReference},
|
||||
};
|
||||
|
||||
use crate::data_source::table_source::{TableHandle, TableSourceAdapter};
|
||||
|
||||
pub mod base_table;
|
||||
|
||||
#[async_trait]
|
||||
pub trait ContextProviderExtension: ContextProvider {
|
||||
fn get_table_source_(&self, name: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>>;
|
||||
}
|
||||
|
||||
pub type TableHandleProviderRef = Arc<dyn TableHandleProvider + Send + Sync>;
|
||||
|
||||
pub trait TableHandleProvider {
|
||||
fn build_table_handle(&self, provider: Arc<ListingTable>) -> DFResult<TableHandle>;
|
||||
}
|
||||
|
||||
pub struct MetadataProvider {
|
||||
provider: Arc<ListingTable>,
|
||||
session: SessionCtx,
|
||||
config_options: ConfigOptions,
|
||||
func_manager: FuncMetaManagerRef,
|
||||
current_session_table_provider: TableHandleProviderRef,
|
||||
}
|
||||
|
||||
impl MetadataProvider {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
provider: Arc<ListingTable>,
|
||||
current_session_table_provider: TableHandleProviderRef,
|
||||
func_manager: FuncMetaManagerRef,
|
||||
session: SessionCtx,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
current_session_table_provider,
|
||||
config_options: session.inner().config_options().clone(),
|
||||
session,
|
||||
func_manager,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_table_handle(&self) -> datafusion::common::Result<TableHandle> {
|
||||
self.current_session_table_provider.build_table_handle(self.provider.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl ContextProviderExtension for MetadataProvider {
|
||||
fn get_table_source_(&self, table_ref: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>> {
|
||||
let name = table_ref.clone().resolve("", "");
|
||||
let table_name = &*name.table;
|
||||
|
||||
let table_handle = self.build_table_handle()?;
|
||||
|
||||
Ok(Arc::new(TableSourceAdapter::try_new(table_ref.clone(), table_name, table_handle)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl ContextProvider for MetadataProvider {
|
||||
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
|
||||
self.func_manager
|
||||
.udf(name)
|
||||
.ok()
|
||||
.or(self.session.inner().scalar_functions().get(name).cloned())
|
||||
}
|
||||
|
||||
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
|
||||
self.func_manager.udaf(name).ok()
|
||||
}
|
||||
|
||||
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
|
||||
if variable_names.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let var_type = if is_system_variables(variable_names) {
|
||||
VarType::System
|
||||
} else {
|
||||
VarType::UserDefined
|
||||
};
|
||||
|
||||
self.session
|
||||
.inner()
|
||||
.execution_props()
|
||||
.get_var_provider(var_type)
|
||||
.and_then(|p| p.get_type(variable_names))
|
||||
}
|
||||
|
||||
fn options(&self) -> &ConfigOptions {
|
||||
// TODO refactor
|
||||
&self.config_options
|
||||
}
|
||||
|
||||
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
|
||||
self.func_manager.udwf(name).ok()
|
||||
}
|
||||
|
||||
fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
|
||||
Ok(self.get_table_source_(name)?)
|
||||
}
|
||||
|
||||
fn udf_names(&self) -> Vec<String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn udaf_names(&self) -> Vec<String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn udwf_names(&self) -> Vec<String> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
33
s3select/query/src/sql/analyzer.rs
Normal file
33
s3select/query/src/sql/analyzer.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use api::query::analyzer::Analyzer;
|
||||
use api::query::session::SessionCtx;
|
||||
use api::QueryResult;
|
||||
use datafusion::logical_expr::LogicalPlan;
|
||||
use datafusion::optimizer::analyzer::Analyzer as DFAnalyzer;
|
||||
|
||||
pub struct DefaultAnalyzer {
|
||||
inner: DFAnalyzer,
|
||||
}
|
||||
|
||||
impl DefaultAnalyzer {
|
||||
pub fn new() -> Self {
|
||||
let analyzer = DFAnalyzer::default();
|
||||
// we can add analyzer rule at here
|
||||
|
||||
Self { inner: analyzer }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultAnalyzer {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Analyzer for DefaultAnalyzer {
|
||||
fn analyze(&self, plan: &LogicalPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
|
||||
let plan = self
|
||||
.inner
|
||||
.execute_and_check(plan.to_owned(), session.inner().config_options(), |_, _| {})?;
|
||||
Ok(plan)
|
||||
}
|
||||
}
|
||||
18
s3select/query/src/sql/dialect.rs
Normal file
18
s3select/query/src/sql/dialect.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use datafusion::sql::sqlparser::dialect::Dialect;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct RustFsDialect;
|
||||
|
||||
impl Dialect for RustFsDialect {
|
||||
fn is_identifier_start(&self, ch: char) -> bool {
|
||||
ch.is_alphabetic() || ch == '_' || ch == '#' || ch == '@'
|
||||
}
|
||||
|
||||
fn is_identifier_part(&self, ch: char) -> bool {
|
||||
ch.is_alphabetic() || ch.is_ascii_digit() || ch == '@' || ch == '$' || ch == '#' || ch == '_'
|
||||
}
|
||||
|
||||
fn supports_group_by_expr(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
2
s3select/query/src/sql/logical/mod.rs
Normal file
2
s3select/query/src/sql/logical/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod optimizer;
|
||||
pub mod planner;
|
||||
111
s3select/query/src/sql/logical/optimizer.rs
Normal file
111
s3select/query/src/sql/logical/optimizer.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::{
|
||||
query::{analyzer::AnalyzerRef, logical_planner::QueryPlan, session::SessionCtx},
|
||||
QueryResult,
|
||||
};
|
||||
use datafusion::{
|
||||
execution::SessionStateBuilder,
|
||||
logical_expr::LogicalPlan,
|
||||
optimizer::{
|
||||
common_subexpr_eliminate::CommonSubexprEliminate, decorrelate_predicate_subquery::DecorrelatePredicateSubquery,
|
||||
eliminate_cross_join::EliminateCrossJoin, eliminate_duplicated_expr::EliminateDuplicatedExpr,
|
||||
eliminate_filter::EliminateFilter, eliminate_join::EliminateJoin, eliminate_limit::EliminateLimit,
|
||||
eliminate_outer_join::EliminateOuterJoin, extract_equijoin_predicate::ExtractEquijoinPredicate,
|
||||
filter_null_join_keys::FilterNullJoinKeys, propagate_empty_relation::PropagateEmptyRelation,
|
||||
push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
|
||||
replace_distinct_aggregate::ReplaceDistinctWithAggregate, scalar_subquery_to_join::ScalarSubqueryToJoin,
|
||||
simplify_expressions::SimplifyExpressions, single_distinct_to_groupby::SingleDistinctToGroupBy,
|
||||
unwrap_cast_in_comparison::UnwrapCastInComparison, OptimizerRule,
|
||||
},
|
||||
};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::sql::analyzer::DefaultAnalyzer;
|
||||
|
||||
pub trait LogicalOptimizer: Send + Sync {
|
||||
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
|
||||
|
||||
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>);
|
||||
}
|
||||
|
||||
pub struct DefaultLogicalOptimizer {
|
||||
// fit datafusion
|
||||
// TODO refactor
|
||||
analyzer: AnalyzerRef,
|
||||
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl DefaultLogicalOptimizer {
|
||||
#[allow(dead_code)]
|
||||
fn with_optimizer_rules(mut self, rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
|
||||
self.rules = rules;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultLogicalOptimizer {
|
||||
fn default() -> Self {
|
||||
let analyzer = Arc::new(DefaultAnalyzer::default());
|
||||
|
||||
// additional optimizer rule
|
||||
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
|
||||
// df default rules start
|
||||
Arc::new(SimplifyExpressions::new()),
|
||||
Arc::new(UnwrapCastInComparison::new()),
|
||||
Arc::new(ReplaceDistinctWithAggregate::new()),
|
||||
Arc::new(EliminateJoin::new()),
|
||||
Arc::new(DecorrelatePredicateSubquery::new()),
|
||||
Arc::new(ScalarSubqueryToJoin::new()),
|
||||
Arc::new(ExtractEquijoinPredicate::new()),
|
||||
// simplify expressions does not simplify expressions in subqueries, so we
|
||||
// run it again after running the optimizations that potentially converted
|
||||
// subqueries to joins
|
||||
Arc::new(SimplifyExpressions::new()),
|
||||
Arc::new(EliminateDuplicatedExpr::new()),
|
||||
Arc::new(EliminateFilter::new()),
|
||||
Arc::new(EliminateCrossJoin::new()),
|
||||
Arc::new(CommonSubexprEliminate::new()),
|
||||
Arc::new(EliminateLimit::new()),
|
||||
Arc::new(PropagateEmptyRelation::new()),
|
||||
Arc::new(FilterNullJoinKeys::default()),
|
||||
Arc::new(EliminateOuterJoin::new()),
|
||||
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
|
||||
Arc::new(PushDownLimit::new()),
|
||||
Arc::new(PushDownFilter::new()),
|
||||
Arc::new(SingleDistinctToGroupBy::new()),
|
||||
// The previous optimizations added expressions and projections,
|
||||
// that might benefit from the following rules
|
||||
Arc::new(SimplifyExpressions::new()),
|
||||
Arc::new(UnwrapCastInComparison::new()),
|
||||
Arc::new(CommonSubexprEliminate::new()),
|
||||
// PushDownProjection can pushdown Projections through Limits, do PushDownLimit again.
|
||||
Arc::new(PushDownLimit::new()),
|
||||
// df default rules end
|
||||
// custom rules can add at here
|
||||
];
|
||||
|
||||
Self { analyzer, rules }
|
||||
}
|
||||
}
|
||||
|
||||
impl LogicalOptimizer for DefaultLogicalOptimizer {
|
||||
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
|
||||
let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session)? };
|
||||
|
||||
debug!("Analyzed logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
|
||||
|
||||
let optimizeed_plan = {
|
||||
SessionStateBuilder::new_from_existing(session.inner().clone())
|
||||
.with_optimizer_rules(self.rules.clone())
|
||||
.build()
|
||||
.optimize(&analyzed_plan)?
|
||||
};
|
||||
|
||||
Ok(optimizeed_plan)
|
||||
}
|
||||
|
||||
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>) {
|
||||
self.rules.push(optimizer_rule);
|
||||
}
|
||||
}
|
||||
3
s3select/query/src/sql/logical/planner.rs
Normal file
3
s3select/query/src/sql/logical/planner.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
use crate::sql::planner::SqlPlanner;
|
||||
|
||||
pub type DefaultLogicalPlanner<'a, S> = SqlPlanner<'a, S>;
|
||||
7
s3select/query/src/sql/mod.rs
Normal file
7
s3select/query/src/sql/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub mod analyzer;
|
||||
pub mod dialect;
|
||||
pub mod logical;
|
||||
pub mod optimizer;
|
||||
pub mod parser;
|
||||
pub mod physical;
|
||||
pub mod planner;
|
||||
82
s3select/query/src/sql/optimizer.rs
Normal file
82
s3select/query/src/sql/optimizer.rs
Normal file
@@ -0,0 +1,82 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::{
|
||||
query::{logical_planner::QueryPlan, optimizer::Optimizer, physical_planner::PhysicalPlanner, session::SessionCtx},
|
||||
QueryResult,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use datafusion::physical_plan::{displayable, ExecutionPlan};
|
||||
use tracing::debug;
|
||||
|
||||
use super::{
|
||||
logical::optimizer::{DefaultLogicalOptimizer, LogicalOptimizer},
|
||||
physical::{optimizer::PhysicalOptimizer, planner::DefaultPhysicalPlanner},
|
||||
};
|
||||
|
||||
pub struct CascadeOptimizer {
|
||||
logical_optimizer: Arc<dyn LogicalOptimizer + Send + Sync>,
|
||||
physical_planner: Arc<dyn PhysicalPlanner + Send + Sync>,
|
||||
physical_optimizer: Arc<dyn PhysicalOptimizer + Send + Sync>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Optimizer for CascadeOptimizer {
|
||||
async fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>> {
|
||||
debug!("Original logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
|
||||
|
||||
let optimized_logical_plan = self.logical_optimizer.optimize(plan, session)?;
|
||||
|
||||
debug!("Final logical plan:\n{}\n", optimized_logical_plan.display_indent_schema(),);
|
||||
|
||||
let physical_plan = {
|
||||
self.physical_planner
|
||||
.create_physical_plan(&optimized_logical_plan, session)
|
||||
.await?
|
||||
};
|
||||
|
||||
debug!("Original physical plan:\n{}\n", displayable(physical_plan.as_ref()).indent(false));
|
||||
|
||||
let optimized_physical_plan = { self.physical_optimizer.optimize(physical_plan, session)? };
|
||||
|
||||
Ok(optimized_physical_plan)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CascadeOptimizerBuilder {
|
||||
logical_optimizer: Option<Arc<dyn LogicalOptimizer + Send + Sync>>,
|
||||
physical_planner: Option<Arc<dyn PhysicalPlanner + Send + Sync>>,
|
||||
physical_optimizer: Option<Arc<dyn PhysicalOptimizer + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl CascadeOptimizerBuilder {
|
||||
pub fn with_logical_optimizer(mut self, logical_optimizer: Arc<dyn LogicalOptimizer + Send + Sync>) -> Self {
|
||||
self.logical_optimizer = Some(logical_optimizer);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_physical_planner(mut self, physical_planner: Arc<dyn PhysicalPlanner + Send + Sync>) -> Self {
|
||||
self.physical_planner = Some(physical_planner);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_physical_optimizer(mut self, physical_optimizer: Arc<dyn PhysicalOptimizer + Send + Sync>) -> Self {
|
||||
self.physical_optimizer = Some(physical_optimizer);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> CascadeOptimizer {
|
||||
let default_logical_optimizer = Arc::new(DefaultLogicalOptimizer::default());
|
||||
let default_physical_planner = Arc::new(DefaultPhysicalPlanner::default());
|
||||
|
||||
let logical_optimizer = self.logical_optimizer.unwrap_or(default_logical_optimizer);
|
||||
let physical_planner = self.physical_planner.unwrap_or_else(|| default_physical_planner.clone());
|
||||
let physical_optimizer = self.physical_optimizer.unwrap_or(default_physical_planner);
|
||||
|
||||
CascadeOptimizer {
|
||||
logical_optimizer,
|
||||
physical_planner,
|
||||
physical_optimizer,
|
||||
}
|
||||
}
|
||||
}
|
||||
92
s3select/query/src/sql/parser.rs
Normal file
92
s3select/query/src/sql/parser.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use std::{collections::VecDeque, fmt::Display};
|
||||
|
||||
use api::{
|
||||
query::{ast::ExtStatement, parser::Parser as RustFsParser},
|
||||
ParserSnafu,
|
||||
};
|
||||
use datafusion::sql::sqlparser::{
|
||||
dialect::Dialect,
|
||||
parser::{Parser, ParserError},
|
||||
tokenizer::{Token, Tokenizer},
|
||||
};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use super::dialect::RustFsDialect;
|
||||
|
||||
pub type Result<T, E = ParserError> = std::result::Result<T, E>;
|
||||
|
||||
// Use `Parser::expected` instead, if possible
|
||||
macro_rules! parser_err {
|
||||
($MSG:expr) => {
|
||||
Err(ParserError::ParserError($MSG.to_string()))
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct DefaultParser {}
|
||||
|
||||
impl RustFsParser for DefaultParser {
|
||||
fn parse(&self, sql: &str) -> api::QueryResult<VecDeque<ExtStatement>> {
|
||||
ExtParser::parse_sql(sql).context(ParserSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
/// SQL Parser
|
||||
pub struct ExtParser<'a> {
|
||||
parser: Parser<'a>,
|
||||
}
|
||||
|
||||
impl<'a> ExtParser<'a> {
|
||||
/// Parse the specified tokens with dialect
|
||||
fn new_with_dialect(sql: &str, dialect: &'a dyn Dialect) -> Result<Self> {
|
||||
let mut tokenizer = Tokenizer::new(dialect, sql);
|
||||
let tokens = tokenizer.tokenize()?;
|
||||
Ok(ExtParser {
|
||||
parser: Parser::new(dialect).with_tokens(tokens),
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse a SQL statement and produce a set of statements
|
||||
pub fn parse_sql(sql: &str) -> Result<VecDeque<ExtStatement>> {
|
||||
let dialect = &RustFsDialect {};
|
||||
ExtParser::parse_sql_with_dialect(sql, dialect)
|
||||
}
|
||||
|
||||
/// Parse a SQL statement and produce a set of statements
|
||||
pub fn parse_sql_with_dialect(sql: &str, dialect: &dyn Dialect) -> Result<VecDeque<ExtStatement>> {
|
||||
let mut parser = ExtParser::new_with_dialect(sql, dialect)?;
|
||||
let mut stmts = VecDeque::new();
|
||||
let mut expecting_statement_delimiter = false;
|
||||
loop {
|
||||
// ignore empty statements (between successive statement delimiters)
|
||||
while parser.parser.consume_token(&Token::SemiColon) {
|
||||
expecting_statement_delimiter = false;
|
||||
}
|
||||
|
||||
if parser.parser.peek_token() == Token::EOF {
|
||||
break;
|
||||
}
|
||||
if expecting_statement_delimiter {
|
||||
return parser.expected("end of statement", parser.parser.peek_token());
|
||||
}
|
||||
|
||||
let statement = parser.parse_statement()?;
|
||||
stmts.push_back(statement);
|
||||
expecting_statement_delimiter = true;
|
||||
}
|
||||
|
||||
// debug!("Parser sql: {}, stmts: {:#?}", sql, stmts);
|
||||
|
||||
Ok(stmts)
|
||||
}
|
||||
|
||||
/// Parse a new expression
|
||||
fn parse_statement(&mut self) -> Result<ExtStatement> {
|
||||
Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?)))
|
||||
}
|
||||
|
||||
// Report unexpected token
|
||||
fn expected<T>(&self, expected: &str, found: impl Display) -> Result<T> {
|
||||
parser_err!(format!("Expected {}, found: {}", expected, found))
|
||||
}
|
||||
}
|
||||
2
s3select/query/src/sql/physical/mod.rs
Normal file
2
s3select/query/src/sql/physical/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod optimizer;
|
||||
pub mod planner;
|
||||
12
s3select/query/src/sql/physical/optimizer.rs
Normal file
12
s3select/query/src/sql/physical/optimizer.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::query::session::SessionCtx;
|
||||
use api::QueryResult;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
|
||||
pub trait PhysicalOptimizer {
|
||||
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>>;
|
||||
|
||||
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>);
|
||||
}
|
||||
104
s3select/query/src/sql/physical/planner.rs
Normal file
104
s3select/query/src/sql/physical/planner.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::query::physical_planner::PhysicalPlanner;
|
||||
use api::query::session::SessionCtx;
|
||||
use api::QueryResult;
|
||||
use async_trait::async_trait;
|
||||
use datafusion::execution::SessionStateBuilder;
|
||||
use datafusion::logical_expr::LogicalPlan;
|
||||
use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
|
||||
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
|
||||
use datafusion::physical_optimizer::join_selection::JoinSelection;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::physical_planner::{
|
||||
DefaultPhysicalPlanner as DFDefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner as DFPhysicalPlanner,
|
||||
};
|
||||
|
||||
use super::optimizer::PhysicalOptimizer;
|
||||
|
||||
pub struct DefaultPhysicalPlanner {
|
||||
ext_physical_transform_rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
|
||||
/// Responsible for optimizing a physical execution plan
|
||||
ext_physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl DefaultPhysicalPlanner {
|
||||
#[allow(dead_code)]
|
||||
fn with_physical_transform_rules(mut self, rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>) -> Self {
|
||||
self.ext_physical_transform_rules = rules;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl DefaultPhysicalPlanner {
|
||||
#[allow(dead_code)]
|
||||
fn with_optimizer_rules(mut self, rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
|
||||
self.ext_physical_optimizer_rules = rules;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultPhysicalPlanner {
|
||||
fn default() -> Self {
|
||||
let ext_physical_transform_rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
|
||||
// can add rules at here
|
||||
];
|
||||
|
||||
// We need to take care of the rule ordering. They may influence each other.
|
||||
let ext_physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
|
||||
Arc::new(AggregateStatistics::new()),
|
||||
// Statistics-based join selection will change the Auto mode to a real join implementation,
|
||||
// like collect left, or hash join, or future sort merge join, which will influence the
|
||||
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
|
||||
// repartitioning and local sorting steps to meet distribution and ordering requirements.
|
||||
// Therefore, it should run before EnforceDistribution and EnforceSorting.
|
||||
Arc::new(JoinSelection::new()),
|
||||
// The CoalesceBatches rule will not influence the distribution and ordering of the
|
||||
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
|
||||
Arc::new(CoalesceBatches::new()),
|
||||
];
|
||||
|
||||
Self {
|
||||
ext_physical_transform_rules,
|
||||
ext_physical_optimizer_rules,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PhysicalPlanner for DefaultPhysicalPlanner {
|
||||
async fn create_physical_plan(
|
||||
&self,
|
||||
logical_plan: &LogicalPlan,
|
||||
session: &SessionCtx,
|
||||
) -> QueryResult<Arc<dyn ExecutionPlan>> {
|
||||
// 将扩展的物理计划优化规则注入df 的 session state
|
||||
let new_state = SessionStateBuilder::new_from_existing(session.inner().clone())
|
||||
.with_physical_optimizer_rules(self.ext_physical_optimizer_rules.clone())
|
||||
.build();
|
||||
|
||||
// 通过扩展的物理计划转换规则构造df 的 Physical Planner
|
||||
let planner = DFDefaultPhysicalPlanner::with_extension_planners(self.ext_physical_transform_rules.clone());
|
||||
|
||||
// 执行df的物理计划规划及优化
|
||||
planner
|
||||
.create_physical_plan(logical_plan, &new_state)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
fn inject_physical_transform_rule(&mut self, rule: Arc<dyn ExtensionPlanner + Send + Sync>) {
|
||||
self.ext_physical_transform_rules.push(rule)
|
||||
}
|
||||
}
|
||||
|
||||
impl PhysicalOptimizer for DefaultPhysicalPlanner {
|
||||
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, _session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>> {
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>) {
|
||||
self.ext_physical_optimizer_rules.push(optimizer_rule);
|
||||
}
|
||||
}
|
||||
60
s3select/query/src/sql/planner.rs
Normal file
60
s3select/query/src/sql/planner.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use api::{
|
||||
query::{
|
||||
ast::ExtStatement,
|
||||
logical_planner::{LogicalPlanner, Plan, QueryPlan},
|
||||
session::SessionCtx,
|
||||
},
|
||||
QueryError, QueryResult,
|
||||
};
|
||||
use async_recursion::async_recursion;
|
||||
use async_trait::async_trait;
|
||||
use datafusion::sql::{planner::SqlToRel, sqlparser::ast::Statement};
|
||||
|
||||
use crate::metadata::ContextProviderExtension;
|
||||
|
||||
pub struct SqlPlanner<'a, S: ContextProviderExtension> {
|
||||
_schema_provider: &'a S,
|
||||
df_planner: SqlToRel<'a, S>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S: ContextProviderExtension + Send + Sync> LogicalPlanner for SqlPlanner<'_, S> {
|
||||
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
|
||||
let plan = { self.statement_to_plan(statement, session).await? };
|
||||
|
||||
Ok(plan)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> {
|
||||
/// Create a new query planner
|
||||
pub fn new(schema_provider: &'a S) -> Self {
|
||||
SqlPlanner {
|
||||
_schema_provider: schema_provider,
|
||||
df_planner: SqlToRel::new(schema_provider),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a logical plan from an Extent SQL statement
|
||||
#[async_recursion]
|
||||
pub(crate) async fn statement_to_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
|
||||
match statement {
|
||||
ExtStatement::SqlStatement(stmt) => self.df_sql_to_plan(*stmt, session).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn df_sql_to_plan(&self, stmt: Statement, _session: &SessionCtx) -> QueryResult<Plan> {
|
||||
match stmt {
|
||||
Statement::Query(_) => {
|
||||
let df_plan = self.df_planner.sql_statement_to_plan(stmt)?;
|
||||
let plan = Plan::Query(QueryPlan {
|
||||
df_plan,
|
||||
is_tag_scan: false,
|
||||
});
|
||||
|
||||
Ok(plan)
|
||||
}
|
||||
_ => Err(QueryError::NotImplemented { err: stmt.to_string() }),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user