Merge branch 'main' of github.com:rustfs/s3-rustfs into feature/observability

# Conflicts:
#	Cargo.lock
#	Cargo.toml
This commit is contained in:
houseme
2025-03-31 15:12:30 +08:00
55 changed files with 3970 additions and 119 deletions

View File

@@ -3,11 +3,11 @@ name: Build
on:
workflow_dispatch:
schedule:
- cron: '0 0 * * 0' # at midnight of each sunday
- cron: "0 0 * * 0" # at midnight of each sunday
push:
branches:
- main
tags: [ 'v*', '*' ]
tags: ["v*", "*"]
jobs:
build-rustfs:
@@ -16,9 +16,17 @@ jobs:
strategy:
matrix:
variant:
- { profile: dev, target: x86_64-unknown-linux-gnu, glibc: "default" }
- { profile: release, target: x86_64-unknown-linux-gnu, glibc: "default" }
- { profile: release, target: x86_64-unknown-linux-gnu, glibc: "2.31" }
- { profile: dev, target: x86_64-unknown-linux-gnu, glibc: "default" }
- {
profile: release,
target: x86_64-unknown-linux-gnu,
glibc: "default",
}
- {
profile: release,
target: x86_64-unknown-linux-gnu,
glibc: "2.31",
}
steps:
- uses: actions/checkout@v4
@@ -51,13 +59,13 @@ jobs:
ARTIFACT_NAME="${ARTIFACT_NAME}-glibc${{ matrix.variant.glibc }}"
fi
echo "artifact_name=${ARTIFACT_NAME}" >> $GITHUB_OUTPUT
# Determine binary path
bin_path="target/artifacts/rustfs.${{ matrix.variant.profile }}.${{ matrix.variant.target }}.bin"
if [ -f "target/artifacts/rustfs.${{ matrix.variant.profile }}.${{ matrix.variant.target }}.glibc${{ matrix.variant.glibc }}.bin" ]; then
bin_path="target/artifacts/rustfs.${{ matrix.variant.profile }}.${{ matrix.variant.target }}.glibc${{ matrix.variant.glibc }}.bin"
fi
# Create package
mkdir -p ${ARTIFACT_NAME}
cp "$bin_path" ${ARTIFACT_NAME}/rustfs
@@ -69,6 +77,16 @@ jobs:
name: ${{ steps.package.outputs.artifact_name }}
path: ${{ steps.package.outputs.artifact_name }}.zip
retention-days: 7
- name: Upload to Aliyun OSS
uses: JohnGuan/oss-upload-action@main
with:
key-id: ${{ secrets.ALICLOUDOSS_KEY_ID }}
key-secret: ${{ secrets.ALICLOUDOSS_KEY_SECRET }}
region: oss-cn-beijing
bucket: rustfs-artifacts
assets: |
${{ steps.package.outputs.artifact_name }}.zip:/artifacts/rustfs/${{ steps.package.outputs.artifact_name }}.zip
${{ steps.package.outputs.artifact_name }}.zip:/artifacts/rustfs/${{ steps.package.outputs.artifact_name }}.latest.zip
build-rustfs-gui:
runs-on: ubuntu-latest
@@ -88,7 +106,7 @@ jobs:
name: "rustfs-${{ matrix.variant.profile }}-${{ matrix.variant.target }}"
- name: Display structure of downloaded files
run: |
ls -R
ls -R
unzip -o -j "rustfs-${{ matrix.variant.profile }}-${{ matrix.variant.target }}.zip" -d ./cli/rustfs-gui/embedded-rustfs/
ls -la cli/rustfs-gui/embedded-rustfs
- name: Cache dioxus-cli
@@ -108,12 +126,12 @@ jobs:
- name: Build and Bundle rustfs-gui
run: |
ls -la
release_path="target/${{ matrix.variant.target }}"
mkdir -p ${release_path}
cd cli/rustfs-gui
ls -la embedded-rustfs
# Configure the linker based on the target
case "${{ matrix.target }}" in
"x86_64-unknown-linux-gnu")
@@ -140,7 +158,7 @@ jobs:
# Validating Environment Variables (for Debugging)
echo "CC for ${{ matrix.target }}: $CC_${{ matrix.target }}"
echo "Linker for ${{ matrix.target }}: $CARGO_TARGET_${{ matrix.target }}_LINKER"
if [[ "${{ matrix.variant.target }}" == *"apple-darwin"* ]]; then
dx bundle --platform macos --package-types "macos" --package-types "dmg" --package-types "ios" --release --profile release --out-dir ../../${release_path}
elif [[ "${{ matrix.variant.target }}" == *"windows-msvc"* ]]; then
@@ -159,13 +177,23 @@ jobs:
name: ${{ steps.package.outputs.gui_artifact_name }}
path: ${{ steps.package.outputs.gui_artifact_name }}.zip
retention-days: 7
- name: Upload to Aliyun OSS
uses: JohnGuan/oss-upload-action@main
with:
key-id: ${{ secrets.ALICLOUDOSS_KEY_ID }}
key-secret: ${{ secrets.ALICLOUDOSS_KEY_SECRET }}
region: oss-cn-beijing
bucket: rustfs-artifacts
assets: |
${{ steps.package.outputs.gui_artifact_name }}.zip:/artifacts/rustfs/${{ steps.package.outputs.gui_artifact_name }}.zip
${{ steps.package.outputs.gui_artifact_name }}.zip:/artifacts/rustfs/${{ steps.package.outputs.gui_artifact_name }}.latest.zip
merge:
runs-on: ubuntu-latest
needs: [ build-rustfs, build-rustfs-gui ]
needs: [build-rustfs, build-rustfs-gui]
steps:
- uses: actions/upload-artifact/merge@v4
with:
name: rustfs-packages
pattern: 'rustfs-*'
pattern: "rustfs-*"
delete-merged: true

1552
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,8 @@ members = [
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"packages/obs", # Observability utilities
"s3select/api",
"s3select/query",
]
resolver = "2"
@@ -30,6 +32,7 @@ all = "warn"
[workspace.dependencies]
madmin = { path = "./madmin" }
async-recursion = "1.0.5"
async-trait = "0.1.87"
backon = "1.3.0"
bytes = "1.9.0"
@@ -37,6 +40,8 @@ bytesize = "1.3.0"
chrono = { version = "0.4.40", features = ["serde"] }
clap = { version = "4.5.31", features = ["derive", "env"] }
config = "0.15.9"
datafusion = "46.0.0"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
ecstore = { path = "./ecstore" }
@@ -44,7 +49,7 @@ flatbuffers = "24.12.23"
futures = "0.3.31"
futures-util = "0.3.31"
common = { path = "./common/common" }
policy = {path = "./policy"}
policy = { path = "./policy" }
hex = "0.4.3"
hyper = "1.6.0"
hyper-util = { version = "0.1.10", features = [
@@ -107,6 +112,7 @@ tonic = { version = "0.12.3", features = ["gzip"] }
tonic-build = "0.12.3"
tonic-reflection = "0.12"
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.13", features = ["io", "compat"] }
tower = { version = "0.5.2", features = ["timeout"] }
tracing = "0.1.41"
tracing-core = "0.1.33"
@@ -126,6 +132,8 @@ axum = "0.7.9"
md-5 = "0.10.6"
workers = { path = "./common/workers" }
test-case = "3.3.1"
zip = "2.2.3"
snafu = "0.8.5"
[profile.wasm-dev]
inherits = "dev"

View File

@@ -140,10 +140,14 @@ impl<R> EtagReader<R> {
impl<R: AsyncRead + Unpin> AsyncRead for EtagReader<R> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
let befor_size = buf.filled().len();
match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let bytes = buf.filled();
self.md5.update(bytes);
if buf.filled().len() > befor_size {
let bytes = &buf.filled()[befor_size..];
self.md5.update(bytes);
}
Poll::Ready(Ok(()))
}

View File

@@ -151,7 +151,7 @@ fn read_drive_stats(stats_file: &str) -> Result<IOStats> {
fn read_stat(file_name: &str) -> Result<Vec<u64>> {
// 打开文件
let path = Path::new(file_name);
let file = File::open(&path)?;
let file = File::open(path)?;
// 创建一个 BufReader
let reader = io::BufReader::new(file);
@@ -161,7 +161,8 @@ fn read_stat(file_name: &str) -> Result<Vec<u64>> {
if let Some(line) = reader.lines().next() {
let line = line?;
// 分割行并解析为 u64
for token in line.trim().split_whitespace() {
// https://rust-lang.github.io/rust-clippy/master/index.html#trim_split_whitespace
for token in line.split_whitespace() {
let ui64: u64 = token.parse()?;
stats.push(ui64);
}

View File

@@ -20,6 +20,8 @@ madmin.workspace = true
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true
csv = "1.3.1"
datafusion = { workspace = true }
common.workspace = true
ecstore.workspace = true
policy.workspace =true
@@ -46,7 +48,7 @@ serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util = { version = "0.7.13", features = ["io", "compat"] }
tokio-util.workspace = true
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
@@ -71,6 +73,8 @@ const-str = { version = "0.6.1", features = ["std", "proc"] }
atoi = "2.0.0"
serde_urlencoded = "0.7.1"
crypto = { path = "../crypto" }
query = { path = "../s3select/query" }
api = { path = "../s3select/api" }
iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
tower-http = { version = "0.6.2", features = ["cors"] }

View File

@@ -113,15 +113,15 @@ pub fn check_claims_from_token(header: &HeaderMap, cred: &auth::Credentials) ->
}
if token.is_empty() && cred.is_temp() && !cred.is_service_account() {
return Err(s3_error!(InvalidRequest, "invalid token"));
return Err(s3_error!(InvalidRequest, "invalid token1"));
}
if !token.is_empty() && !cred.is_temp() {
return Err(s3_error!(InvalidRequest, "invalid token"));
return Err(s3_error!(InvalidRequest, "invalid token2"));
}
if !cred.is_service_account() && cred.is_temp() && token != cred.session_token {
return Err(s3_error!(InvalidRequest, "invalid token"));
return Err(s3_error!(InvalidRequest, "invalid token3"));
}
if cred.is_temp() && cred.is_expired() {

View File

@@ -4,8 +4,14 @@ use super::options::extract_metadata;
use super::options::put_opts;
use crate::auth::get_condition_values;
use crate::storage::access::ReqInfo;
use api::query::Context;
use api::query::Query;
use api::server::dbms::DatabaseManagerSystem;
use bytes::Bytes;
use common::error::Result;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use ecstore::bucket::error::BucketMetadataError;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
@@ -46,6 +52,7 @@ use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use query::instance::make_rustfsms;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Error;
@@ -55,6 +62,8 @@ use s3s::S3;
use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
@@ -1859,6 +1868,69 @@ impl S3 for FS {
}
Ok(S3Response::new(PutObjectAclOutput::default()))
}
async fn select_object_content(
&self,
req: S3Request<SelectObjectContentInput>,
) -> S3Result<S3Response<SelectObjectContentOutput>> {
info!("handle select_object_content");
let input = req.input;
info!("{:?}", input);
let db = make_rustfsms(input.clone(), false).await.map_err(|e| {
error!("make db failed, {}", e.to_string());
s3_error!(InternalError)
})?;
let query = Query::new(Context { input: input.clone() }, input.request.expression);
let result = db.execute(&query).await.map_err(|_| s3_error!(InternalError))?;
let results = result.result().chunk_result().await.unwrap().to_vec();
let mut buffer = Vec::new();
if input.request.output_serialization.csv.is_some() {
let mut csv_writer = CsvWriterBuilder::new().with_header(false).build(&mut buffer);
for batch in results {
csv_writer
.write(&batch)
.map_err(|e| s3_error!(InternalError, "cann't encode output to csv. e: {}", e.to_string()))?;
}
} else if input.request.output_serialization.json.is_some() {
let mut json_writer = JsonWriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buffer);
for batch in results {
json_writer
.write(&batch)
.map_err(|e| s3_error!(InternalError, "cann't encode output to json. e: {}", e.to_string()))?;
}
json_writer
.finish()
.map_err(|e| s3_error!(InternalError, "writer output into json error, e: {}", e.to_string()))?;
} else {
return Err(s3_error!(InvalidArgument, "unknow output format"));
}
let (tx, rx) = mpsc::channel::<S3Result<SelectObjectContentEvent>>(2);
let stream = ReceiverStream::new(rx);
tokio::spawn(async move {
let _ = tx
.send(Ok(SelectObjectContentEvent::Cont(ContinuationEvent::default())))
.await;
let _ = tx
.send(Ok(SelectObjectContentEvent::Records(RecordsEvent {
payload: Some(Bytes::from(buffer)),
})))
.await;
let _ = tx.send(Ok(SelectObjectContentEvent::End(EndEvent::default()))).await;
drop(tx);
});
Ok(S3Response::new(SelectObjectContentOutput {
payload: Some(SelectObjectContentEventStream::new(stream)),
}))
}
}
#[allow(dead_code)]

22
s3select/api/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "api"
version.workspace = true
edition.workspace = true
[dependencies]
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
datafusion = { workspace = true }
ecstore.workspace = true
futures = { workspace = true }
futures-core = "0.3.31"
http.workspace = true
object_store = "0.11.2"
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
transform-stream.workspace = true
url.workspace = true

77
s3select/api/src/lib.rs Normal file
View File

@@ -0,0 +1,77 @@
use std::fmt::Display;
use datafusion::{common::DataFusionError, sql::sqlparser::parser::ParserError};
use snafu::{Backtrace, Location, Snafu};
pub mod object_store;
pub mod query;
pub mod server;
pub type QueryResult<T> = Result<T, QueryError>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum QueryError {
Datafusion {
source: DataFusionError,
location: Location,
backtrace: Backtrace,
},
#[snafu(display("This feature is not implemented: {}", err))]
NotImplemented { err: String },
#[snafu(display("Multi-statement not allow, found num:{}, sql:{}", num, sql))]
MultiStatement { num: usize, sql: String },
#[snafu(display("Failed to build QueryDispatcher. err: {}", err))]
BuildQueryDispatcher { err: String },
#[snafu(display("The query has been canceled"))]
Cancel,
#[snafu(display("{}", source))]
Parser { source: ParserError },
#[snafu(display("Udf not exists, name:{}.", name))]
FunctionNotExists { name: String },
#[snafu(display("Udf already exists, name:{}.", name))]
FunctionExists { name: String },
#[snafu(display("Store Error, e:{}.", e))]
StoreError { e: String },
}
impl From<DataFusionError> for QueryError {
fn from(value: DataFusionError) -> Self {
match value {
DataFusionError::External(e) if e.downcast_ref::<QueryError>().is_some() => *e.downcast::<QueryError>().unwrap(),
v => Self::Datafusion {
source: v,
location: Default::default(),
backtrace: Backtrace::capture(),
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedTable {
// path
table: String,
}
impl ResolvedTable {
pub fn table(&self) -> &str {
&self.table
}
}
impl Display for ResolvedTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { table } = self;
write!(f, "{table}")
}
}

View File

@@ -0,0 +1,177 @@
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use ecstore::io::READ_BUFFER_SIZE;
use ecstore::new_object_layer_fn;
use ecstore::store::ECStore;
use ecstore::store_api::ObjectIO;
use ecstore::store_api::ObjectOptions;
use ecstore::StorageAPI;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use futures_core::stream::BoxStream;
use http::HeaderMap;
use object_store::path::Path;
use object_store::Attributes;
use object_store::GetOptions;
use object_store::GetResult;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::PutMultipartOpts;
use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::{Error as o_Error, Result};
use s3s::dto::SelectObjectContentInput;
use s3s::s3_error;
use s3s::S3Result;
use std::ops::Range;
use std::sync::Arc;
use tokio_util::io::ReaderStream;
use tracing::info;
use transform_stream::AsyncTryStream;
#[derive(Debug)]
pub struct EcObjectStore {
input: SelectObjectContentInput,
store: Arc<ECStore>,
}
impl EcObjectStore {
pub fn new(input: SelectObjectContentInput) -> S3Result<Self> {
let Some(store) = new_object_layer_fn() else {
return Err(s3_error!(InternalError, "ec store not inited"));
};
Ok(Self { input, store })
}
}
impl std::fmt::Display for EcObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("EcObjectStore")
}
}
#[async_trait]
impl ObjectStore for EcObjectStore {
async fn put_opts(&self, _location: &Path, _payload: PutPayload, _opts: PutOptions) -> Result<PutResult> {
unimplemented!()
}
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOpts) -> Result<Box<dyn MultipartUpload>> {
unimplemented!()
}
async fn get_opts(&self, location: &Path, _options: GetOptions) -> Result<GetResult> {
info!("{:?}", location);
let opts = ObjectOptions::default();
let h = HeaderMap::new();
let reader = self
.store
.get_object_reader(&self.input.bucket, &self.input.key, None, h, &opts)
.await
.map_err(|_| o_Error::NotFound {
path: format!("{}/{}", self.input.bucket, self.input.key),
source: "can not get object info".into(),
})?;
// let stream = stream::unfold(reader.stream, |mut blob| async move {
// match blob.next().await {
// Some(Ok(chunk)) => {
// let bytes = chunk;
// Some((Ok(bytes), blob))
// }
// _ => None,
// }
// })
// .boxed();
let meta = ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: reader.object_info.size,
e_tag: reader.object_info.etag,
version: None,
};
let attributes = Attributes::default();
Ok(GetResult {
payload: object_store::GetResultPayload::Stream(
bytes_stream(ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE), reader.object_info.size).boxed(),
),
meta,
range: 0..reader.object_info.size,
attributes,
})
}
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
unimplemented!()
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
info!("{:?}", location);
let opts = ObjectOptions::default();
let info = self
.store
.get_object_info(&self.input.bucket, &self.input.key, &opts)
.await
.map_err(|_| o_Error::NotFound {
path: format!("{}/{}", self.input.bucket, self.input.key),
source: "can not get object info".into(),
})?;
Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: info.size,
e_tag: info.etag,
version: None,
})
}
async fn delete(&self, _location: &Path) -> Result<()> {
unimplemented!()
}
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
unimplemented!()
}
async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
unimplemented!()
}
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}
async fn copy_if_not_exists(&self, _from: &Path, _too: &Path) -> Result<()> {
unimplemented!()
}
}
pub fn bytes_stream<S>(stream: S, content_length: usize) -> impl Stream<Item = Result<Bytes>> + Send + 'static
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{
AsyncTryStream::<Bytes, o_Error, _>::new(|mut y| async move {
pin_mut!(stream);
let mut remaining: usize = content_length;
while let Some(result) = stream.next().await {
let mut bytes = result.map_err(|e| o_Error::Generic {
store: "",
source: Box::new(e),
})?;
if bytes.len() > remaining {
bytes.truncate(remaining);
}
remaining -= bytes.len();
y.yield_ok(bytes).await;
}
Ok(())
})
}

View File

@@ -0,0 +1,12 @@
use std::sync::Arc;
use datafusion::logical_expr::LogicalPlan;
use super::session::SessionCtx;
use crate::QueryResult;
pub type AnalyzerRef = Arc<dyn Analyzer + Send + Sync>;
pub trait Analyzer {
fn analyze(&self, plan: &LogicalPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
}

View File

@@ -0,0 +1,8 @@
use datafusion::sql::sqlparser::ast::Statement;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExtStatement {
/// ANSI SQL AST node
SqlStatement(Box<Statement>),
// we can expand command
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,32 @@
use std::sync::Arc;
use async_trait::async_trait;
use crate::QueryResult;
use super::{
execution::{Output, QueryStateMachine},
logical_planner::Plan,
Query,
};
#[async_trait]
pub trait QueryDispatcher: Send + Sync {
// fn create_query_id(&self) -> QueryId;
// fn query_info(&self, id: &QueryId);
async fn execute_query(&self, query: &Query) -> QueryResult<Output>;
async fn build_logical_plan(&self, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Option<Plan>>;
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output>;
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>>;
// fn running_query_infos(&self) -> Vec<QueryInfo>;
// fn running_query_status(&self) -> Vec<QueryStatus>;
// fn cancel_query(&self, id: &QueryId);
}

View File

@@ -0,0 +1,241 @@
use std::fmt::Display;
use std::pin::Pin;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{Stream, StreamExt, TryStreamExt};
use crate::{QueryError, QueryResult};
use super::logical_planner::Plan;
use super::session::SessionCtx;
use super::Query;
pub type QueryExecutionRef = Arc<dyn QueryExecution>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryType {
Batch,
Stream,
}
impl Display for QueryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Batch => write!(f, "batch"),
Self::Stream => write!(f, "stream"),
}
}
}
#[async_trait]
pub trait QueryExecution: Send + Sync {
fn query_type(&self) -> QueryType {
QueryType::Batch
}
// 开始
async fn start(&self) -> QueryResult<Output>;
// 停止
fn cancel(&self) -> QueryResult<()>;
}
pub enum Output {
StreamData(SendableRecordBatchStream),
Nil(()),
}
impl Output {
pub fn schema(&self) -> SchemaRef {
match self {
Self::StreamData(stream) => stream.schema(),
Self::Nil(_) => Arc::new(Schema::empty()),
}
}
pub async fn chunk_result(self) -> QueryResult<Vec<RecordBatch>> {
match self {
Self::Nil(_) => Ok(vec![]),
Self::StreamData(stream) => {
let schema = stream.schema();
let mut res: Vec<RecordBatch> = stream.try_collect::<Vec<RecordBatch>>().await?;
if res.is_empty() {
res.push(RecordBatch::new_empty(schema));
}
Ok(res)
}
}
}
pub async fn num_rows(self) -> usize {
match self.chunk_result().await {
Ok(rb) => rb.iter().map(|e| e.num_rows()).sum(),
Err(_) => 0,
}
}
/// Returns the number of records affected by the query operation
///
/// If it is a select statement, returns the number of rows in the result set
///
/// -1 means unknown
///
/// panic! when StreamData's number of records greater than i64::Max
pub async fn affected_rows(self) -> i64 {
self.num_rows().await as i64
}
}
impl Stream for Output {
type Item = std::result::Result<RecordBatch, QueryError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this {
Output::StreamData(stream) => stream.poll_next_unpin(cx).map_err(|e| e.into()),
Output::Nil(_) => Poll::Ready(None),
}
}
}
#[async_trait]
pub trait QueryExecutionFactory {
async fn create_query_execution(
&self,
plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryExecutionRef>;
}
pub type QueryStateMachineRef = Arc<QueryStateMachine>;
pub struct QueryStateMachine {
pub session: SessionCtx,
pub query: Query,
state: AtomicPtr<QueryState>,
start: Instant,
}
impl QueryStateMachine {
pub fn begin(query: Query, session: SessionCtx) -> Self {
Self {
session,
query,
state: AtomicPtr::new(Box::into_raw(Box::new(QueryState::ACCEPTING))),
start: Instant::now(),
}
}
pub fn begin_analyze(&self) {
// TODO record time
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::ANALYZING)));
}
pub fn end_analyze(&self) {
// TODO record time
}
pub fn begin_optimize(&self) {
// TODO record time
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTMIZING)));
}
pub fn end_optimize(&self) {
// TODO
}
pub fn begin_schedule(&self) {
// TODO
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::SCHEDULING)));
}
pub fn end_schedule(&self) {
// TODO
}
pub fn finish(&self) {
// TODO
self.translate_to(Box::new(QueryState::DONE(DONE::FINISHED)));
}
pub fn cancel(&self) {
// TODO
self.translate_to(Box::new(QueryState::DONE(DONE::CANCELLED)));
}
pub fn fail(&self) {
// TODO
self.translate_to(Box::new(QueryState::DONE(DONE::FAILED)));
}
pub fn state(&self) -> &QueryState {
unsafe { &*self.state.load(Ordering::Relaxed) }
}
pub fn duration(&self) -> Duration {
self.start.elapsed()
}
fn translate_to(&self, state: Box<QueryState>) {
self.state.store(Box::into_raw(state), Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub enum QueryState {
ACCEPTING,
RUNNING(RUNNING),
DONE(DONE),
}
impl AsRef<str> for QueryState {
fn as_ref(&self) -> &str {
match self {
QueryState::ACCEPTING => "ACCEPTING",
QueryState::RUNNING(e) => e.as_ref(),
QueryState::DONE(e) => e.as_ref(),
}
}
}
#[derive(Debug, Clone)]
pub enum RUNNING {
DISPATCHING,
ANALYZING,
OPTMIZING,
SCHEDULING,
}
impl AsRef<str> for RUNNING {
fn as_ref(&self) -> &str {
match self {
Self::DISPATCHING => "DISPATCHING",
Self::ANALYZING => "ANALYZING",
Self::OPTMIZING => "OPTMIZING",
Self::SCHEDULING => "SCHEDULING",
}
}
}
#[derive(Debug, Clone)]
pub enum DONE {
FINISHED,
FAILED,
CANCELLED,
}
impl AsRef<str> for DONE {
fn as_ref(&self) -> &str {
match self {
Self::FINISHED => "FINISHED",
Self::FAILED => "FAILED",
Self::CANCELLED => "CANCELLED",
}
}
}

View File

@@ -0,0 +1,23 @@
use std::collections::HashSet;
use std::sync::Arc;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use crate::QueryResult;
pub type FuncMetaManagerRef = Arc<dyn FunctionMetadataManager + Send + Sync>;
pub trait FunctionMetadataManager {
fn register_udf(&mut self, udf: ScalarUDF) -> QueryResult<()>;
fn register_udaf(&mut self, udaf: AggregateUDF) -> QueryResult<()>;
fn register_udwf(&mut self, udwf: WindowUDF) -> QueryResult<()>;
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>>;
fn udaf(&self, name: &str) -> QueryResult<Arc<AggregateUDF>>;
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>>;
fn udfs(&self) -> HashSet<String>;
}

View File

@@ -0,0 +1,40 @@
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::logical_expr::LogicalPlan as DFPlan;
use crate::QueryResult;
use super::ast::ExtStatement;
use super::session::SessionCtx;
#[derive(Clone)]
pub enum Plan {
// only support query sql
/// Query plan
Query(QueryPlan),
}
impl Plan {
pub fn schema(&self) -> SchemaRef {
match self {
Self::Query(p) => SchemaRef::from(p.df_plan.schema().as_ref().to_owned()),
}
}
}
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub df_plan: DFPlan,
pub is_tag_scan: bool,
}
impl QueryPlan {
pub fn is_explain(&self) -> bool {
matches!(self.df_plan, DFPlan::Explain(_) | DFPlan::Analyze(_))
}
}
#[async_trait]
pub trait LogicalPlanner {
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan>;
}

View File

@@ -0,0 +1,41 @@
use s3s::dto::SelectObjectContentInput;
pub mod analyzer;
pub mod ast;
pub mod datasource;
pub mod dispatcher;
pub mod execution;
pub mod function;
pub mod logical_planner;
pub mod optimizer;
pub mod parser;
pub mod physical_planner;
pub mod scheduler;
pub mod session;
#[derive(Clone)]
pub struct Context {
// maybe we need transfer some info?
pub input: SelectObjectContentInput,
}
#[derive(Clone)]
pub struct Query {
context: Context,
content: String,
}
impl Query {
#[inline(always)]
pub fn new(context: Context, content: String) -> Self {
Self { context, content }
}
pub fn context(&self) -> &Context {
&self.context
}
pub fn content(&self) -> &str {
self.content.as_str()
}
}

View File

@@ -0,0 +1,15 @@
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::physical_plan::ExecutionPlan;
use super::logical_planner::QueryPlan;
use super::session::SessionCtx;
use crate::QueryResult;
pub type OptimizerRef = Arc<dyn Optimizer + Send + Sync>;
#[async_trait]
pub trait Optimizer {
async fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>>;
}

View File

@@ -0,0 +1,8 @@
use std::collections::VecDeque;
use super::ast::ExtStatement;
use crate::QueryResult;
pub trait Parser {
fn parse(&self, sql: &str) -> QueryResult<VecDeque<ExtStatement>>;
}

View File

@@ -0,0 +1,21 @@
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::ExtensionPlanner;
use super::session::SessionCtx;
use crate::QueryResult;
#[async_trait]
pub trait PhysicalPlanner {
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionCtx,
) -> QueryResult<Arc<dyn ExecutionPlan>>;
fn inject_physical_transform_rule(&mut self, rule: Arc<dyn ExtensionPlanner + Send + Sync>);
}

View File

@@ -0,0 +1,32 @@
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
pub type SchedulerRef = Arc<dyn Scheduler + Send + Sync>;
#[async_trait]
pub trait Scheduler {
/// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
///
/// Returns a [`ExecutionResults`] that can be used to receive results as they are produced,
/// as a [`futures::Stream`] of [`RecordBatch`]
async fn schedule(&self, plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> Result<ExecutionResults>;
}
pub struct ExecutionResults {
stream: SendableRecordBatchStream,
}
impl ExecutionResults {
pub fn new(stream: SendableRecordBatchStream) -> Self {
Self { stream }
}
/// Returns a [`SendableRecordBatchStream`] of this execution
pub fn stream(self) -> SendableRecordBatchStream {
self.stream
}
}

View File

@@ -0,0 +1,86 @@
use std::sync::Arc;
use bytes::Bytes;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
prelude::SessionContext,
};
use object_store::{memory::InMemory, path::Path, ObjectStore};
use tracing::error;
use crate::{object_store::EcObjectStore, QueryError, QueryResult};
use super::Context;
#[derive(Clone)]
pub struct SessionCtx {
_desc: Arc<SessionCtxDesc>,
inner: SessionState,
}
impl SessionCtx {
pub fn inner(&self) -> &SessionState {
&self.inner
}
}
#[derive(Clone)]
pub struct SessionCtxDesc {
// maybe we need some info
}
#[derive(Default)]
pub struct SessionCtxFactory {
pub is_test: bool,
}
impl SessionCtxFactory {
pub async fn create_session_ctx(&self, context: &Context) -> QueryResult<SessionCtx> {
let df_session_ctx = self.build_df_session_context(context).await?;
Ok(SessionCtx {
_desc: Arc::new(SessionCtxDesc {}),
inner: df_session_ctx.state(),
})
}
async fn build_df_session_context(&self, context: &Context) -> QueryResult<SessionContext> {
let path = format!("s3://{}", context.input.bucket);
let store_url = url::Url::parse(&path).unwrap();
let rt = RuntimeEnvBuilder::new().build()?;
let df_session_state = SessionStateBuilder::new()
.with_runtime_env(Arc::new(rt))
.with_default_features();
let df_session_state = if self.is_test {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let data = b"id,name,age,department,salary
1,Alice,25,HR,5000
2,Bob,30,IT,6000
3,Charlie,35,Finance,7000
4,Diana,22,Marketing,4500
5,Eve,28,IT,5500
6,Frank,40,Finance,8000
7,Grace,26,HR,5200
8,Henry,32,IT,6200
9,Ivy,24,Marketing,4800
10,Jack,38,Finance,7500";
let data_bytes = Bytes::from(data.to_vec());
let path = Path::from(context.input.key.clone());
store.put(&path, data_bytes.into()).await.map_err(|e| {
error!("put data into memory failed: {}", e.to_string());
QueryError::StoreError { e: e.to_string() }
})?;
df_session_state.with_object_store(&store_url, Arc::new(store)).build()
} else {
let store =
EcObjectStore::new(context.input.clone()).map_err(|_| QueryError::NotImplemented { err: String::new() })?;
df_session_state.with_object_store(&store_url, Arc::new(store)).build()
};
let df_session_ctx = SessionContext::new_with_state(df_session_state);
Ok(df_session_ctx)
}
}

View File

@@ -0,0 +1,41 @@
use async_trait::async_trait;
use crate::{
query::{
execution::{Output, QueryStateMachineRef},
logical_planner::Plan,
Query,
},
QueryResult,
};
pub struct QueryHandle {
query: Query,
result: Output,
}
impl QueryHandle {
pub fn new(query: Query, result: Output) -> Self {
Self { query, result }
}
pub fn query(&self) -> &Query {
&self.query
}
pub fn result(self) -> Output {
self.result
}
}
#[async_trait]
pub trait DatabaseManagerSystem {
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle>;
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef>;
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>>;
async fn execute_logical_plan(
&self,
logical_plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryHandle>;
}

View File

@@ -0,0 +1 @@
pub mod dbms;

17
s3select/query/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "query"
version.workspace = true
edition.workspace = true
[dependencies]
api = { path = "../api" }
async-recursion = { workspace = true }
async-trait.workspace = true
datafusion = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
parking_lot = { version = "0.12.1" }
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio = { workspace = true }
tracing = { workspace = true }

View File

@@ -0,0 +1 @@
pub mod table_source;

View File

@@ -0,0 +1,138 @@
use std::any::Any;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::Arc;
use std::write;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableSource};
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use tracing::debug;
pub const TEMP_LOCATION_TABLE_NAME: &str = "external_location_table";
pub struct TableSourceAdapter {
database_name: String,
table_name: String,
table_handle: TableHandle,
plan: LogicalPlan,
}
impl TableSourceAdapter {
pub fn try_new(
table_ref: impl Into<TableReference>,
table_name: impl Into<String>,
table_handle: impl Into<TableHandle>,
) -> Result<Self, DataFusionError> {
let table_name: String = table_name.into();
let table_handle = table_handle.into();
let plan = match &table_handle {
// TableScan
TableHandle::External(t) => {
let table_source = provider_as_source(t.clone());
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
}
// TableScan
TableHandle::TableProvider(t) => {
let table_source = provider_as_source(t.clone());
if let Some(plan) = table_source.get_logical_plan() {
LogicalPlanBuilder::from(plan.into_owned()).build()?
} else {
LogicalPlanBuilder::scan(table_ref, table_source, None)?.build()?
}
}
};
debug!("Table source logical plan node of {}:\n{}", table_name, plan.display_indent_schema());
Ok(Self {
database_name: "default_db".to_string(),
table_name,
table_handle,
plan,
})
}
pub fn database_name(&self) -> &str {
&self.database_name
}
pub fn table_name(&self) -> &str {
&self.table_name
}
pub fn table_handle(&self) -> &TableHandle {
&self.table_handle
}
}
#[async_trait]
impl TableSource for TableSourceAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.table_handle.schema()
}
fn supports_filters_pushdown(&self, filter: &[&Expr]) -> DFResult<Vec<TableProviderFilterPushDown>> {
self.table_handle.supports_filters_pushdown(filter)
}
/// Called by [`InlineTableScan`]
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Owned(self.plan.clone()))
}
}
#[derive(Clone)]
pub enum TableHandle {
TableProvider(Arc<dyn TableProvider>),
External(Arc<ListingTable>),
}
impl TableHandle {
pub fn schema(&self) -> SchemaRef {
match self {
Self::External(t) => t.schema(),
Self::TableProvider(t) => t.schema(),
}
}
pub fn supports_filters_pushdown(&self, filter: &[&Expr]) -> DFResult<Vec<TableProviderFilterPushDown>> {
match self {
Self::External(t) => t.supports_filters_pushdown(filter),
Self::TableProvider(t) => t.supports_filters_pushdown(filter),
}
}
}
impl From<Arc<dyn TableProvider>> for TableHandle {
fn from(value: Arc<dyn TableProvider>) -> Self {
TableHandle::TableProvider(value)
}
}
impl From<Arc<ListingTable>> for TableHandle {
fn from(value: Arc<ListingTable>) -> Self {
TableHandle::External(value)
}
}
impl Display for TableHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::External(e) => write!(f, "External({:?})", e.table_paths()),
Self::TableProvider(_) => write!(f, "TableProvider"),
}
}
}

View File

@@ -0,0 +1,271 @@
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use api::{
query::{
ast::ExtStatement,
dispatcher::QueryDispatcher,
execution::{Output, QueryStateMachine},
function::FuncMetaManagerRef,
logical_planner::{LogicalPlanner, Plan},
parser::Parser,
session::{SessionCtx, SessionCtxFactory},
Query,
},
QueryError, QueryResult,
};
use async_trait::async_trait;
use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
config::CsvOptions,
datasource::{
file_format::{csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::Result as DFResult,
execution::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::{Stream, StreamExt};
use s3s::dto::SelectObjectContentInput;
use crate::{
execution::factory::QueryExecutionFactoryRef,
metadata::{base_table::BaseTableProvider, ContextProviderExtension, MetadataProvider, TableHandleProviderRef},
sql::logical::planner::DefaultLogicalPlanner,
};
#[derive(Clone)]
pub struct SimpleQueryDispatcher {
input: SelectObjectContentInput,
// client for default tenant
_default_table_provider: TableHandleProviderRef,
session_factory: Arc<SessionCtxFactory>,
// parser
parser: Arc<dyn Parser + Send + Sync>,
// get query execution factory
query_execution_factory: QueryExecutionFactoryRef,
func_manager: FuncMetaManagerRef,
}
#[async_trait]
impl QueryDispatcher for SimpleQueryDispatcher {
async fn execute_query(&self, query: &Query) -> QueryResult<Output> {
let query_state_machine = { self.build_query_state_machine(query.clone()).await? };
let logical_plan = self.build_logical_plan(query_state_machine.clone()).await?;
let logical_plan = match logical_plan {
Some(plan) => plan,
None => return Ok(Output::Nil(())),
};
let result = self.execute_logical_plan(logical_plan, query_state_machine).await?;
Ok(result)
}
async fn build_logical_plan(&self, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Option<Plan>> {
let session = &query_state_machine.session;
let query = &query_state_machine.query;
let scheme_provider = self.build_scheme_provider(session).await?;
let logical_planner = DefaultLogicalPlanner::new(&scheme_provider);
let statements = self.parser.parse(query.content())?;
// not allow multi statement
if statements.len() > 1 {
return Err(QueryError::MultiStatement {
num: statements.len(),
sql: query_state_machine.query.content().to_string(),
});
}
let stmt = match statements.front() {
Some(stmt) => stmt.clone(),
None => return Ok(None),
};
let logical_plan = self
.statement_to_logical_plan(stmt, &logical_planner, query_state_machine)
.await?;
Ok(Some(logical_plan))
}
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output> {
self.execute_logical_plan(logical_plan, query_state_machine).await
}
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>> {
let session = self.session_factory.create_session_ctx(query.context()).await?;
let query_state_machine = Arc::new(QueryStateMachine::begin(query, session));
Ok(query_state_machine)
}
}
impl SimpleQueryDispatcher {
async fn statement_to_logical_plan<S: ContextProviderExtension + Send + Sync>(
&self,
stmt: ExtStatement,
logical_planner: &DefaultLogicalPlanner<'_, S>,
query_state_machine: Arc<QueryStateMachine>,
) -> QueryResult<Plan> {
// begin analyze
query_state_machine.begin_analyze();
let logical_plan = logical_planner
.create_logical_plan(stmt, &query_state_machine.session)
.await?;
query_state_machine.end_analyze();
Ok(logical_plan)
}
async fn execute_logical_plan(&self, logical_plan: Plan, query_state_machine: Arc<QueryStateMachine>) -> QueryResult<Output> {
let execution = self
.query_execution_factory
.create_query_execution(logical_plan, query_state_machine.clone())
.await?;
match execution.start().await {
Ok(Output::StreamData(stream)) => Ok(Output::StreamData(Box::pin(TrackedRecordBatchStream { inner: stream }))),
Ok(nil @ Output::Nil(_)) => Ok(nil),
Err(err) => Err(err),
}
}
async fn build_scheme_provider(&self, session: &SessionCtx) -> QueryResult<MetadataProvider> {
let path = format!("s3://{}/{}", self.input.bucket, self.input.key);
let table_path = ListingTableUrl::parse(path)?;
let listing_options = if self.input.request.input_serialization.csv.is_some() {
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false));
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
} else if self.input.request.input_serialization.parquet.is_some() {
let file_format = ParquetFormat::new();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
} else if self.input.request.input_serialization.json.is_some() {
let file_format = JsonFormat::default();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".json")
} else {
return Err(QueryError::NotImplemented {
err: "not support this file type".to_string(),
});
};
let resolve_schema = listing_options.infer_schema(session.inner(), &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolve_schema);
let provider = Arc::new(ListingTable::try_new(config)?);
let current_session_table_provider = self.build_table_handle_provider()?;
let metadata_provider =
MetadataProvider::new(provider, current_session_table_provider, self.func_manager.clone(), session.clone());
Ok(metadata_provider)
}
fn build_table_handle_provider(&self) -> QueryResult<TableHandleProviderRef> {
let current_session_table_provider: Arc<BaseTableProvider> = Arc::new(BaseTableProvider::default());
Ok(current_session_table_provider)
}
}
pub struct TrackedRecordBatchStream {
inner: SendableRecordBatchStream,
}
impl RecordBatchStream for TrackedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}
impl Stream for TrackedRecordBatchStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
#[derive(Default, Clone)]
pub struct SimpleQueryDispatcherBuilder {
input: Option<SelectObjectContentInput>,
default_table_provider: Option<TableHandleProviderRef>,
session_factory: Option<Arc<SessionCtxFactory>>,
parser: Option<Arc<dyn Parser + Send + Sync>>,
query_execution_factory: Option<QueryExecutionFactoryRef>,
func_manager: Option<FuncMetaManagerRef>,
}
impl SimpleQueryDispatcherBuilder {
pub fn with_input(mut self, input: SelectObjectContentInput) -> Self {
self.input = Some(input);
self
}
pub fn with_default_table_provider(mut self, default_table_provider: TableHandleProviderRef) -> Self {
self.default_table_provider = Some(default_table_provider);
self
}
pub fn with_session_factory(mut self, session_factory: Arc<SessionCtxFactory>) -> Self {
self.session_factory = Some(session_factory);
self
}
pub fn with_parser(mut self, parser: Arc<dyn Parser + Send + Sync>) -> Self {
self.parser = Some(parser);
self
}
pub fn with_query_execution_factory(mut self, query_execution_factory: QueryExecutionFactoryRef) -> Self {
self.query_execution_factory = Some(query_execution_factory);
self
}
pub fn with_func_manager(mut self, func_manager: FuncMetaManagerRef) -> Self {
self.func_manager = Some(func_manager);
self
}
pub fn build(self) -> QueryResult<Arc<SimpleQueryDispatcher>> {
let input = self.input.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of input".to_string(),
})?;
let session_factory = self.session_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of session_factory".to_string(),
})?;
let parser = self.parser.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of parser".to_string(),
})?;
let query_execution_factory = self.query_execution_factory.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of query_execution_factory".to_string(),
})?;
let func_manager = self.func_manager.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of func_manager".to_string(),
})?;
let default_table_provider = self.default_table_provider.ok_or_else(|| QueryError::BuildQueryDispatcher {
err: "lost of default_table_provider".to_string(),
})?;
let dispatcher = Arc::new(SimpleQueryDispatcher {
input,
_default_table_provider: default_table_provider,
session_factory,
parser,
query_execution_factory,
func_manager,
});
Ok(dispatcher)
}
}

View File

@@ -0,0 +1 @@
pub mod manager;

View File

@@ -0,0 +1,46 @@
use std::sync::Arc;
use api::{
query::{
execution::{QueryExecutionFactory, QueryExecutionRef, QueryStateMachineRef},
logical_planner::Plan,
optimizer::Optimizer,
scheduler::SchedulerRef,
},
QueryError,
};
use async_trait::async_trait;
use super::query::SqlQueryExecution;
pub type QueryExecutionFactoryRef = Arc<dyn QueryExecutionFactory + Send + Sync>;
pub struct SqlQueryExecutionFactory {
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
}
impl SqlQueryExecutionFactory {
#[inline(always)]
pub fn new(optimizer: Arc<dyn Optimizer + Send + Sync>, scheduler: SchedulerRef) -> Self {
Self { optimizer, scheduler }
}
}
#[async_trait]
impl QueryExecutionFactory for SqlQueryExecutionFactory {
async fn create_query_execution(
&self,
plan: Plan,
state_machine: QueryStateMachineRef,
) -> Result<QueryExecutionRef, QueryError> {
match plan {
Plan::Query(query_plan) => Ok(Arc::new(SqlQueryExecution::new(
state_machine,
query_plan,
self.optimizer.clone(),
self.scheduler.clone(),
))),
}
}
}

View File

@@ -0,0 +1,3 @@
pub mod factory;
pub mod query;
pub mod scheduler;

View File

@@ -0,0 +1,92 @@
use std::sync::Arc;
use api::query::execution::{Output, QueryExecution, QueryStateMachineRef};
use api::query::logical_planner::QueryPlan;
use api::query::optimizer::Optimizer;
use api::query::scheduler::SchedulerRef;
use api::{QueryError, QueryResult};
use async_trait::async_trait;
use futures::stream::AbortHandle;
use parking_lot::Mutex;
use tracing::debug;
pub struct SqlQueryExecution {
query_state_machine: QueryStateMachineRef,
plan: QueryPlan,
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
abort_handle: Mutex<Option<AbortHandle>>,
}
impl SqlQueryExecution {
pub fn new(
query_state_machine: QueryStateMachineRef,
plan: QueryPlan,
optimizer: Arc<dyn Optimizer + Send + Sync>,
scheduler: SchedulerRef,
) -> Self {
Self {
query_state_machine,
plan,
optimizer,
scheduler,
abort_handle: Mutex::new(None),
}
}
async fn start(&self) -> QueryResult<Output> {
// begin optimize
self.query_state_machine.begin_optimize();
let physical_plan = self.optimizer.optimize(&self.plan, &self.query_state_machine.session).await?;
self.query_state_machine.end_optimize();
// begin schedule
self.query_state_machine.begin_schedule();
let stream = self
.scheduler
.schedule(physical_plan.clone(), self.query_state_machine.session.inner().task_ctx())
.await?
.stream();
debug!("Success build result stream.");
self.query_state_machine.end_schedule();
Ok(Output::StreamData(stream))
}
}
#[async_trait]
impl QueryExecution for SqlQueryExecution {
async fn start(&self) -> QueryResult<Output> {
let (task, abort_handle) = futures::future::abortable(self.start());
{
*self.abort_handle.lock() = Some(abort_handle);
}
task.await.map_err(|_| QueryError::Cancel)?
}
fn cancel(&self) -> QueryResult<()> {
debug!(
"cancel sql query execution: sql: {}, state: {:?}",
self.query_state_machine.query.content(),
self.query_state_machine.state()
);
// change state
self.query_state_machine.cancel();
// stop future task
if let Some(e) = self.abort_handle.lock().as_ref() {
e.abort()
};
debug!(
"canceled sql query execution: sql: {}, state: {:?}",
self.query_state_machine.query.content(),
self.query_state_machine.state()
);
Ok(())
}
}

View File

@@ -0,0 +1,22 @@
use std::sync::Arc;
use api::query::scheduler::{ExecutionResults, Scheduler};
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{execute_stream, ExecutionPlan};
pub struct LocalScheduler {}
#[async_trait]
impl Scheduler for LocalScheduler {
async fn schedule(
&self,
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<ExecutionResults, DataFusionError> {
let stream = execute_stream(plan, context)?;
Ok(ExecutionResults::new(stream))
}
}

View File

@@ -0,0 +1 @@
pub mod local;

View File

@@ -0,0 +1 @@
pub mod simple_func_manager;

View File

@@ -0,0 +1,63 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::query::function::FunctionMetadataManager;
use api::{QueryError, QueryResult};
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
pub type SimpleFunctionMetadataManagerRef = Arc<SimpleFunctionMetadataManager>;
#[derive(Debug, Default)]
pub struct SimpleFunctionMetadataManager {
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions registered in the context
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Window functions registered in the context
pub window_functions: HashMap<String, Arc<WindowUDF>>,
}
impl FunctionMetadataManager for SimpleFunctionMetadataManager {
fn register_udf(&mut self, f: ScalarUDF) -> QueryResult<()> {
self.scalar_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
Ok(())
}
fn register_udaf(&mut self, f: AggregateUDF) -> QueryResult<()> {
self.aggregate_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
Ok(())
}
fn register_udwf(&mut self, f: WindowUDF) -> QueryResult<()> {
self.window_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
Ok(())
}
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>> {
let result = self.scalar_functions.get(&name.to_uppercase());
result
.cloned()
.ok_or_else(|| QueryError::FunctionExists { name: name.to_string() })
}
fn udaf(&self, name: &str) -> QueryResult<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(&name.to_uppercase());
result
.cloned()
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
}
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>> {
let result = self.window_functions.get(&name.to_uppercase());
result
.cloned()
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
}
fn udfs(&self) -> HashSet<String> {
self.scalar_functions.keys().cloned().collect()
}
}

View File

@@ -0,0 +1,164 @@
use std::sync::Arc;
use api::{
query::{
dispatcher::QueryDispatcher, execution::QueryStateMachineRef, logical_planner::Plan, session::SessionCtxFactory, Query,
},
server::dbms::{DatabaseManagerSystem, QueryHandle},
QueryResult,
};
use async_trait::async_trait;
use derive_builder::Builder;
use s3s::dto::SelectObjectContentInput;
use crate::{
dispatcher::manager::SimpleQueryDispatcherBuilder,
execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler},
function::simple_func_manager::SimpleFunctionMetadataManager,
metadata::base_table::BaseTableProvider,
sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser},
};
#[derive(Builder)]
pub struct RustFSms<D: QueryDispatcher> {
// query dispatcher & query execution
query_dispatcher: Arc<D>,
}
#[async_trait]
impl<D> DatabaseManagerSystem for RustFSms<D>
where
D: QueryDispatcher,
{
async fn execute(&self, query: &Query) -> QueryResult<QueryHandle> {
let result = self.query_dispatcher.execute_query(query).await?;
Ok(QueryHandle::new(query.clone(), result))
}
async fn build_query_state_machine(&self, query: Query) -> QueryResult<QueryStateMachineRef> {
let query_state_machine = self.query_dispatcher.build_query_state_machine(query).await?;
Ok(query_state_machine)
}
async fn build_logical_plan(&self, query_state_machine: QueryStateMachineRef) -> QueryResult<Option<Plan>> {
let logical_plan = self.query_dispatcher.build_logical_plan(query_state_machine).await?;
Ok(logical_plan)
}
async fn execute_logical_plan(
&self,
logical_plan: Plan,
query_state_machine: QueryStateMachineRef,
) -> QueryResult<QueryHandle> {
let query = query_state_machine.query.clone();
let result = self
.query_dispatcher
.execute_logical_plan(logical_plan, query_state_machine)
.await?;
Ok(QueryHandle::new(query.clone(), result))
}
}
pub async fn make_rustfsms(input: SelectObjectContentInput, is_test: bool) -> QueryResult<impl DatabaseManagerSystem> {
// init Function Manager, we can define some UDF if need
let func_manager = SimpleFunctionMetadataManager::default();
// TODO session config need load global system config
let session_factory = Arc::new(SessionCtxFactory { is_test });
let parser = Arc::new(DefaultParser::default());
let optimizer = Arc::new(CascadeOptimizerBuilder::default().build());
// TODO wrap, and num_threads configurable
let scheduler = Arc::new(LocalScheduler {});
let query_execution_factory = Arc::new(SqlQueryExecutionFactory::new(optimizer, scheduler));
let default_table_provider = Arc::new(BaseTableProvider::default());
let query_dispatcher = SimpleQueryDispatcherBuilder::default()
.with_input(input)
.with_func_manager(Arc::new(func_manager))
.with_default_table_provider(default_table_provider)
.with_session_factory(session_factory)
.with_parser(parser)
.with_query_execution_factory(query_execution_factory)
.build()?;
let mut builder = RustFSmsBuilder::default();
let db_server = builder.query_dispatcher(query_dispatcher).build().expect("build db server");
Ok(db_server)
}
#[cfg(test)]
mod tests {
use api::{
query::{Context, Query},
server::dbms::DatabaseManagerSystem,
};
use datafusion::{arrow::util::pretty, assert_batches_eq};
use s3s::dto::{
CSVInput, CSVOutput, ExpressionType, InputSerialization, OutputSerialization, SelectObjectContentInput,
SelectObjectContentRequest,
};
use crate::instance::make_rustfsms;
#[tokio::test]
#[ignore]
async fn test_simple_sql() {
let sql = "select * from S3Object";
let input = SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
request: SelectObjectContentRequest {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput::default()),
..Default::default()
},
output_serialization: OutputSerialization {
csv: Some(CSVOutput::default()),
..Default::default()
},
request_progress: None,
scan_range: None,
},
};
let db = make_rustfsms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());
let result = db.execute(&query).await.unwrap();
let results = result.result().chunk_result().await.unwrap().to_vec();
let expected = [
"+----------------+----------+----------+------------+----------+",
"| column_1 | column_2 | column_3 | column_4 | column_5 |",
"+----------------+----------+----------+------------+----------+",
"| id | name | age | department | salary |",
"| 1 | Alice | 25 | HR | 5000 |",
"| 2 | Bob | 30 | IT | 6000 |",
"| 3 | Charlie | 35 | Finance | 7000 |",
"| 4 | Diana | 22 | Marketing | 4500 |",
"| 5 | Eve | 28 | IT | 5500 |",
"| 6 | Frank | 40 | Finance | 8000 |",
"| 7 | Grace | 26 | HR | 5200 |",
"| 8 | Henry | 32 | IT | 6200 |",
"| 9 | Ivy | 24 | Marketing | 4800 |",
"| 10 | Jack | 38 | Finance | 7500 |",
"+----------------+----------+----------+------------+----------+",
];
assert_batches_eq!(expected, &results);
pretty::print_batches(&results).unwrap();
}
}

View File

@@ -0,0 +1,7 @@
pub mod data_source;
pub mod dispatcher;
pub mod execution;
pub mod function;
pub mod instance;
pub mod metadata;
pub mod sql;

View File

@@ -0,0 +1,17 @@
use std::sync::Arc;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use crate::data_source::table_source::TableHandle;
use super::TableHandleProvider;
#[derive(Default)]
pub struct BaseTableProvider {}
impl TableHandleProvider for BaseTableProvider {
fn build_table_handle(&self, provider: Arc<ListingTable>) -> DFResult<TableHandle> {
Ok(TableHandle::External(provider))
}
}

View File

@@ -0,0 +1,126 @@
use std::sync::Arc;
use api::query::{function::FuncMetaManagerRef, session::SessionCtx};
use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use datafusion::logical_expr::var_provider::is_system_variables;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion::variable::VarType;
use datafusion::{
config::ConfigOptions,
sql::{planner::ContextProvider, TableReference},
};
use crate::data_source::table_source::{TableHandle, TableSourceAdapter};
pub mod base_table;
#[async_trait]
pub trait ContextProviderExtension: ContextProvider {
fn get_table_source_(&self, name: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>>;
}
pub type TableHandleProviderRef = Arc<dyn TableHandleProvider + Send + Sync>;
pub trait TableHandleProvider {
fn build_table_handle(&self, provider: Arc<ListingTable>) -> DFResult<TableHandle>;
}
pub struct MetadataProvider {
provider: Arc<ListingTable>,
session: SessionCtx,
config_options: ConfigOptions,
func_manager: FuncMetaManagerRef,
current_session_table_provider: TableHandleProviderRef,
}
impl MetadataProvider {
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Arc<ListingTable>,
current_session_table_provider: TableHandleProviderRef,
func_manager: FuncMetaManagerRef,
session: SessionCtx,
) -> Self {
Self {
provider,
current_session_table_provider,
config_options: session.inner().config_options().clone(),
session,
func_manager,
}
}
fn build_table_handle(&self) -> datafusion::common::Result<TableHandle> {
self.current_session_table_provider.build_table_handle(self.provider.clone())
}
}
impl ContextProviderExtension for MetadataProvider {
fn get_table_source_(&self, table_ref: TableReference) -> datafusion::common::Result<Arc<TableSourceAdapter>> {
let name = table_ref.clone().resolve("", "");
let table_name = &*name.table;
let table_handle = self.build_table_handle()?;
Ok(Arc::new(TableSourceAdapter::try_new(table_ref.clone(), table_name, table_handle)?))
}
}
impl ContextProvider for MetadataProvider {
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.func_manager
.udf(name)
.ok()
.or(self.session.inner().scalar_functions().get(name).cloned())
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.func_manager.udaf(name).ok()
}
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
if variable_names.is_empty() {
return None;
}
let var_type = if is_system_variables(variable_names) {
VarType::System
} else {
VarType::UserDefined
};
self.session
.inner()
.execution_props()
.get_var_provider(var_type)
.and_then(|p| p.get_type(variable_names))
}
fn options(&self) -> &ConfigOptions {
// TODO refactor
&self.config_options
}
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
self.func_manager.udwf(name).ok()
}
fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
Ok(self.get_table_source_(name)?)
}
fn udf_names(&self) -> Vec<String> {
todo!()
}
fn udaf_names(&self) -> Vec<String> {
todo!()
}
fn udwf_names(&self) -> Vec<String> {
todo!()
}
}

View File

@@ -0,0 +1,33 @@
use api::query::analyzer::Analyzer;
use api::query::session::SessionCtx;
use api::QueryResult;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::analyzer::Analyzer as DFAnalyzer;
pub struct DefaultAnalyzer {
inner: DFAnalyzer,
}
impl DefaultAnalyzer {
pub fn new() -> Self {
let analyzer = DFAnalyzer::default();
// we can add analyzer rule at here
Self { inner: analyzer }
}
}
impl Default for DefaultAnalyzer {
fn default() -> Self {
Self::new()
}
}
impl Analyzer for DefaultAnalyzer {
fn analyze(&self, plan: &LogicalPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
let plan = self
.inner
.execute_and_check(plan.to_owned(), session.inner().config_options(), |_, _| {})?;
Ok(plan)
}
}

View File

@@ -0,0 +1,18 @@
use datafusion::sql::sqlparser::dialect::Dialect;
#[derive(Debug, Default)]
pub struct RustFsDialect;
impl Dialect for RustFsDialect {
fn is_identifier_start(&self, ch: char) -> bool {
ch.is_alphabetic() || ch == '_' || ch == '#' || ch == '@'
}
fn is_identifier_part(&self, ch: char) -> bool {
ch.is_alphabetic() || ch.is_ascii_digit() || ch == '@' || ch == '$' || ch == '#' || ch == '_'
}
fn supports_group_by_expr(&self) -> bool {
true
}
}

View File

@@ -0,0 +1,2 @@
pub mod optimizer;
pub mod planner;

View File

@@ -0,0 +1,111 @@
use std::sync::Arc;
use api::{
query::{analyzer::AnalyzerRef, logical_planner::QueryPlan, session::SessionCtx},
QueryResult,
};
use datafusion::{
execution::SessionStateBuilder,
logical_expr::LogicalPlan,
optimizer::{
common_subexpr_eliminate::CommonSubexprEliminate, decorrelate_predicate_subquery::DecorrelatePredicateSubquery,
eliminate_cross_join::EliminateCrossJoin, eliminate_duplicated_expr::EliminateDuplicatedExpr,
eliminate_filter::EliminateFilter, eliminate_join::EliminateJoin, eliminate_limit::EliminateLimit,
eliminate_outer_join::EliminateOuterJoin, extract_equijoin_predicate::ExtractEquijoinPredicate,
filter_null_join_keys::FilterNullJoinKeys, propagate_empty_relation::PropagateEmptyRelation,
push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
replace_distinct_aggregate::ReplaceDistinctWithAggregate, scalar_subquery_to_join::ScalarSubqueryToJoin,
simplify_expressions::SimplifyExpressions, single_distinct_to_groupby::SingleDistinctToGroupBy,
unwrap_cast_in_comparison::UnwrapCastInComparison, OptimizerRule,
},
};
use tracing::debug;
use crate::sql::analyzer::DefaultAnalyzer;
pub trait LogicalOptimizer: Send + Sync {
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>);
}
pub struct DefaultLogicalOptimizer {
// fit datafusion
// TODO refactor
analyzer: AnalyzerRef,
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
impl DefaultLogicalOptimizer {
#[allow(dead_code)]
fn with_optimizer_rules(mut self, rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
self.rules = rules;
self
}
}
impl Default for DefaultLogicalOptimizer {
fn default() -> Self {
let analyzer = Arc::new(DefaultAnalyzer::default());
// additional optimizer rule
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
// df default rules start
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
Arc::new(ScalarSubqueryToJoin::new()),
Arc::new(ExtractEquijoinPredicate::new()),
// simplify expressions does not simplify expressions in subqueries, so we
// run it again after running the optimizations that potentially converted
// subqueries to joins
Arc::new(SimplifyExpressions::new()),
Arc::new(EliminateDuplicatedExpr::new()),
Arc::new(EliminateFilter::new()),
Arc::new(EliminateCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(PropagateEmptyRelation::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(EliminateOuterJoin::new()),
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
Arc::new(PushDownLimit::new()),
Arc::new(PushDownFilter::new()),
Arc::new(SingleDistinctToGroupBy::new()),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
// PushDownProjection can pushdown Projections through Limits, do PushDownLimit again.
Arc::new(PushDownLimit::new()),
// df default rules end
// custom rules can add at here
];
Self { analyzer, rules }
}
}
impl LogicalOptimizer for DefaultLogicalOptimizer {
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session)? };
debug!("Analyzed logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
let optimizeed_plan = {
SessionStateBuilder::new_from_existing(session.inner().clone())
.with_optimizer_rules(self.rules.clone())
.build()
.optimize(&analyzed_plan)?
};
Ok(optimizeed_plan)
}
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>) {
self.rules.push(optimizer_rule);
}
}

View File

@@ -0,0 +1,3 @@
use crate::sql::planner::SqlPlanner;
pub type DefaultLogicalPlanner<'a, S> = SqlPlanner<'a, S>;

View File

@@ -0,0 +1,7 @@
pub mod analyzer;
pub mod dialect;
pub mod logical;
pub mod optimizer;
pub mod parser;
pub mod physical;
pub mod planner;

View File

@@ -0,0 +1,82 @@
use std::sync::Arc;
use api::{
query::{logical_planner::QueryPlan, optimizer::Optimizer, physical_planner::PhysicalPlanner, session::SessionCtx},
QueryResult,
};
use async_trait::async_trait;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use tracing::debug;
use super::{
logical::optimizer::{DefaultLogicalOptimizer, LogicalOptimizer},
physical::{optimizer::PhysicalOptimizer, planner::DefaultPhysicalPlanner},
};
pub struct CascadeOptimizer {
logical_optimizer: Arc<dyn LogicalOptimizer + Send + Sync>,
physical_planner: Arc<dyn PhysicalPlanner + Send + Sync>,
physical_optimizer: Arc<dyn PhysicalOptimizer + Send + Sync>,
}
#[async_trait]
impl Optimizer for CascadeOptimizer {
async fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>> {
debug!("Original logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
let optimized_logical_plan = self.logical_optimizer.optimize(plan, session)?;
debug!("Final logical plan:\n{}\n", optimized_logical_plan.display_indent_schema(),);
let physical_plan = {
self.physical_planner
.create_physical_plan(&optimized_logical_plan, session)
.await?
};
debug!("Original physical plan:\n{}\n", displayable(physical_plan.as_ref()).indent(false));
let optimized_physical_plan = { self.physical_optimizer.optimize(physical_plan, session)? };
Ok(optimized_physical_plan)
}
}
#[derive(Default)]
pub struct CascadeOptimizerBuilder {
logical_optimizer: Option<Arc<dyn LogicalOptimizer + Send + Sync>>,
physical_planner: Option<Arc<dyn PhysicalPlanner + Send + Sync>>,
physical_optimizer: Option<Arc<dyn PhysicalOptimizer + Send + Sync>>,
}
impl CascadeOptimizerBuilder {
pub fn with_logical_optimizer(mut self, logical_optimizer: Arc<dyn LogicalOptimizer + Send + Sync>) -> Self {
self.logical_optimizer = Some(logical_optimizer);
self
}
pub fn with_physical_planner(mut self, physical_planner: Arc<dyn PhysicalPlanner + Send + Sync>) -> Self {
self.physical_planner = Some(physical_planner);
self
}
pub fn with_physical_optimizer(mut self, physical_optimizer: Arc<dyn PhysicalOptimizer + Send + Sync>) -> Self {
self.physical_optimizer = Some(physical_optimizer);
self
}
pub fn build(self) -> CascadeOptimizer {
let default_logical_optimizer = Arc::new(DefaultLogicalOptimizer::default());
let default_physical_planner = Arc::new(DefaultPhysicalPlanner::default());
let logical_optimizer = self.logical_optimizer.unwrap_or(default_logical_optimizer);
let physical_planner = self.physical_planner.unwrap_or_else(|| default_physical_planner.clone());
let physical_optimizer = self.physical_optimizer.unwrap_or(default_physical_planner);
CascadeOptimizer {
logical_optimizer,
physical_planner,
physical_optimizer,
}
}
}

View File

@@ -0,0 +1,92 @@
use std::{collections::VecDeque, fmt::Display};
use api::{
query::{ast::ExtStatement, parser::Parser as RustFsParser},
ParserSnafu,
};
use datafusion::sql::sqlparser::{
dialect::Dialect,
parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
};
use snafu::ResultExt;
use super::dialect::RustFsDialect;
pub type Result<T, E = ParserError> = std::result::Result<T, E>;
// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
($MSG:expr) => {
Err(ParserError::ParserError($MSG.to_string()))
};
}
#[derive(Default)]
pub struct DefaultParser {}
impl RustFsParser for DefaultParser {
fn parse(&self, sql: &str) -> api::QueryResult<VecDeque<ExtStatement>> {
ExtParser::parse_sql(sql).context(ParserSnafu)
}
}
/// SQL Parser
pub struct ExtParser<'a> {
parser: Parser<'a>,
}
impl<'a> ExtParser<'a> {
/// Parse the specified tokens with dialect
fn new_with_dialect(sql: &str, dialect: &'a dyn Dialect) -> Result<Self> {
let mut tokenizer = Tokenizer::new(dialect, sql);
let tokens = tokenizer.tokenize()?;
Ok(ExtParser {
parser: Parser::new(dialect).with_tokens(tokens),
})
}
/// Parse a SQL statement and produce a set of statements
pub fn parse_sql(sql: &str) -> Result<VecDeque<ExtStatement>> {
let dialect = &RustFsDialect {};
ExtParser::parse_sql_with_dialect(sql, dialect)
}
/// Parse a SQL statement and produce a set of statements
pub fn parse_sql_with_dialect(sql: &str, dialect: &dyn Dialect) -> Result<VecDeque<ExtStatement>> {
let mut parser = ExtParser::new_with_dialect(sql, dialect)?;
let mut stmts = VecDeque::new();
let mut expecting_statement_delimiter = false;
loop {
// ignore empty statements (between successive statement delimiters)
while parser.parser.consume_token(&Token::SemiColon) {
expecting_statement_delimiter = false;
}
if parser.parser.peek_token() == Token::EOF {
break;
}
if expecting_statement_delimiter {
return parser.expected("end of statement", parser.parser.peek_token());
}
let statement = parser.parse_statement()?;
stmts.push_back(statement);
expecting_statement_delimiter = true;
}
// debug!("Parser sql: {}, stmts: {:#?}", sql, stmts);
Ok(stmts)
}
/// Parse a new expression
fn parse_statement(&mut self) -> Result<ExtStatement> {
Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?)))
}
// Report unexpected token
fn expected<T>(&self, expected: &str, found: impl Display) -> Result<T> {
parser_err!(format!("Expected {}, found: {}", expected, found))
}
}

View File

@@ -0,0 +1,2 @@
pub mod optimizer;
pub mod planner;

View File

@@ -0,0 +1,12 @@
use std::sync::Arc;
use api::query::session::SessionCtx;
use api::QueryResult;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
pub trait PhysicalOptimizer {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>>;
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>);
}

View File

@@ -0,0 +1,104 @@
use std::sync::Arc;
use api::query::physical_planner::PhysicalPlanner;
use api::query::session::SessionCtx;
use api::QueryResult;
use async_trait::async_trait;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{
DefaultPhysicalPlanner as DFDefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner as DFPhysicalPlanner,
};
use super::optimizer::PhysicalOptimizer;
pub struct DefaultPhysicalPlanner {
ext_physical_transform_rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
/// Responsible for optimizing a physical execution plan
ext_physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
}
impl DefaultPhysicalPlanner {
#[allow(dead_code)]
fn with_physical_transform_rules(mut self, rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>) -> Self {
self.ext_physical_transform_rules = rules;
self
}
}
impl DefaultPhysicalPlanner {
#[allow(dead_code)]
fn with_optimizer_rules(mut self, rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
self.ext_physical_optimizer_rules = rules;
self
}
}
impl Default for DefaultPhysicalPlanner {
fn default() -> Self {
let ext_physical_transform_rules: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
// can add rules at here
];
// We need to take care of the rule ordering. They may influence each other.
let ext_physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
];
Self {
ext_physical_transform_rules,
ext_physical_optimizer_rules,
}
}
}
#[async_trait]
impl PhysicalPlanner for DefaultPhysicalPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session: &SessionCtx,
) -> QueryResult<Arc<dyn ExecutionPlan>> {
// 将扩展的物理计划优化规则注入df 的 session state
let new_state = SessionStateBuilder::new_from_existing(session.inner().clone())
.with_physical_optimizer_rules(self.ext_physical_optimizer_rules.clone())
.build();
// 通过扩展的物理计划转换规则构造df 的 Physical Planner
let planner = DFDefaultPhysicalPlanner::with_extension_planners(self.ext_physical_transform_rules.clone());
// 执行df的物理计划规划及优化
planner
.create_physical_plan(logical_plan, &new_state)
.await
.map_err(|e| e.into())
}
fn inject_physical_transform_rule(&mut self, rule: Arc<dyn ExtensionPlanner + Send + Sync>) {
self.ext_physical_transform_rules.push(rule)
}
}
impl PhysicalOptimizer for DefaultPhysicalPlanner {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, _session: &SessionCtx) -> QueryResult<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn inject_optimizer_rule(&mut self, optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>) {
self.ext_physical_optimizer_rules.push(optimizer_rule);
}
}

View File

@@ -0,0 +1,60 @@
use api::{
query::{
ast::ExtStatement,
logical_planner::{LogicalPlanner, Plan, QueryPlan},
session::SessionCtx,
},
QueryError, QueryResult,
};
use async_recursion::async_recursion;
use async_trait::async_trait;
use datafusion::sql::{planner::SqlToRel, sqlparser::ast::Statement};
use crate::metadata::ContextProviderExtension;
pub struct SqlPlanner<'a, S: ContextProviderExtension> {
_schema_provider: &'a S,
df_planner: SqlToRel<'a, S>,
}
#[async_trait]
impl<S: ContextProviderExtension + Send + Sync> LogicalPlanner for SqlPlanner<'_, S> {
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
let plan = { self.statement_to_plan(statement, session).await? };
Ok(plan)
}
}
impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> {
/// Create a new query planner
pub fn new(schema_provider: &'a S) -> Self {
SqlPlanner {
_schema_provider: schema_provider,
df_planner: SqlToRel::new(schema_provider),
}
}
/// Generate a logical plan from an Extent SQL statement
#[async_recursion]
pub(crate) async fn statement_to_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
match statement {
ExtStatement::SqlStatement(stmt) => self.df_sql_to_plan(*stmt, session).await,
}
}
async fn df_sql_to_plan(&self, stmt: Statement, _session: &SessionCtx) -> QueryResult<Plan> {
match stmt {
Statement::Query(_) => {
let df_plan = self.df_planner.sql_statement_to_plan(stmt)?;
let plan = Plan::Query(QueryPlan {
df_plan,
is_tag_scan: false,
});
Ok(plan)
}
_ => Err(QueryError::NotImplemented { err: stmt.to_string() }),
}
}
}

View File

@@ -1 +1 @@
curl -L "https://dl.rustfs.com/console/rustfs-console-latest.zip" -o tempfile.zip && unzip -o tempfile.zip -d ./rustfs/static && rm tempfile.zip
curl -L "https://dl.rustfs.com/artifacts/console/rustfs-console-latest.zip" -o tempfile.zip && unzip -o tempfile.zip -d ./rustfs/static && rm tempfile.zip