Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-03-14 03:26:32 +00:00
parent 0b270bf0cc
commit 83e2c8f69f
17 changed files with 105 additions and 53 deletions

View File

@@ -151,7 +151,7 @@ fn read_drive_stats(stats_file: &str) -> Result<IOStats> {
fn read_stat(file_name: &str) -> Result<Vec<u64>> {
// 打开文件
let path = Path::new(file_name);
let file = File::open(&path)?;
let file = File::open(path)?;
// 创建一个 BufReader
let reader = io::BufReader::new(file);
@@ -161,7 +161,8 @@ fn read_stat(file_name: &str) -> Result<Vec<u64>> {
if let Some(line) = reader.lines().next() {
let line = line?;
// 分割行并解析为 u64
for token in line.trim().split_whitespace() {
// https://rust-lang.github.io/rust-clippy/master/index.html#trim_split_whitespace
for token in line.split_whitespace() {
let ui64: u64 = token.parse()?;
stats.push(ui64);
}

View File

@@ -6,9 +6,12 @@ use crate::auth::get_condition_values;
use crate::storage::access::ReqInfo;
use api::query::Context;
use api::query::Query;
use api::server::dbms::DatabaseManagerSystem;
use bytes::Bytes;
use common::error::Result;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use ecstore::bucket::error::BucketMetadataError;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
@@ -43,7 +46,6 @@ use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::HeaderMap;
use iam::policy::action::Action;
use api::server::dbms::DatabaseManagerSystem;
use iam::policy::action::S3Action;
use lazy_static::lazy_static;
use log::warn;
@@ -61,10 +63,10 @@ use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::fmt::Debug;
use std::str::FromStr;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
@@ -1878,13 +1880,12 @@ impl S3 for FS {
let input = req.input;
info!("{:?}", input);
let db = make_cnosdbms(input).await.map_err(|_| {
s3_error!(InternalError)
})?;
let query = Query::new(Context {input: input.clone()}, input.request.expression);
let result = db.execute(&query).await.map_err(|_| {
let db = make_cnosdbms(input.clone()).await.map_err(|e| {
error!("make db failed, {}", e.to_string());
s3_error!(InternalError)
})?;
let query = Query::new(Context { input: input.clone() }, input.request.expression);
let result = db.execute(&query).await.map_err(|_| s3_error!(InternalError))?;
let results = result.result().chunk_result().await.unwrap().to_vec();

View File

@@ -32,6 +32,12 @@ pub enum QueryError {
#[snafu(display("{}", source))]
Parser { source: ParserError },
#[snafu(display("Udf not exists, name:{}.", name))]
FunctionNotExists { name: String },
#[snafu(display("Udf already exists, name:{}.", name))]
FunctionExists { name: String },
}
impl From<DataFusionError> for QueryError {

View File

@@ -79,7 +79,7 @@ impl ObjectStore for EcObjectStore {
let stream = stream::unfold(reader.stream, |mut blob| async move {
match blob.next().await {
Some(Ok(chunk)) => {
let bytes = Bytes::from(chunk);
let bytes = chunk;
Some((Ok(bytes), blob))
}
_ => None,

View File

@@ -1 +1 @@
pub mod data_source;
pub mod table_source;

View File

@@ -1,7 +1,6 @@
use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, Mutex},
sync::Arc,
task::{Context, Poll},
};
@@ -31,14 +30,10 @@ use datafusion::{
};
use futures::{Stream, StreamExt};
use s3s::dto::SelectObjectContentInput;
use tokio::task::JoinHandle;
use crate::{
execution::factory::QueryExecutionFactoryRef,
metadata::{
base_table::BaseTableProvider,
ContextProviderExtension, MetadataProvider, TableHandleProviderRef,
},
metadata::{base_table::BaseTableProvider, ContextProviderExtension, MetadataProvider, TableHandleProviderRef},
sql::logical::planner::DefaultLogicalPlanner,
};
@@ -46,7 +41,7 @@ use crate::{
pub struct SimpleQueryDispatcher {
input: SelectObjectContentInput,
// client for default tenant
default_table_provider: TableHandleProviderRef,
_default_table_provider: TableHandleProviderRef,
session_factory: Arc<SessionCtxFactory>,
// parser
parser: Arc<dyn Parser + Send + Sync>,
@@ -264,7 +259,7 @@ impl SimpleQueryDispatcherBuilder {
let dispatcher = Arc::new(SimpleQueryDispatcher {
input,
default_table_provider,
_default_table_provider: default_table_provider,
session_factory,
parser,
query_execution_factory,

View File

@@ -0,0 +1 @@
pub mod simple_func_manager;

View File

@@ -0,0 +1,63 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::query::function::FunctionMetadataManager;
use api::{QueryError, QueryResult};
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
pub type SimpleFunctionMetadataManagerRef = Arc<SimpleFunctionMetadataManager>;
#[derive(Debug, Default)]
pub struct SimpleFunctionMetadataManager {
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions registered in the context
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Window functions registered in the context
pub window_functions: HashMap<String, Arc<WindowUDF>>,
}
impl FunctionMetadataManager for SimpleFunctionMetadataManager {
fn register_udf(&mut self, f: ScalarUDF) -> QueryResult<()> {
self.scalar_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
Ok(())
}
fn register_udaf(&mut self, f: AggregateUDF) -> QueryResult<()> {
self.aggregate_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
Ok(())
}
fn register_udwf(&mut self, f: WindowUDF) -> QueryResult<()> {
self.window_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
Ok(())
}
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>> {
let result = self.scalar_functions.get(&name.to_uppercase());
result
.cloned()
.ok_or_else(|| QueryError::FunctionExists { name: name.to_string() })
}
fn udaf(&self, name: &str) -> QueryResult<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(&name.to_uppercase());
result
.cloned()
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
}
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>> {
let result = self.window_functions.get(&name.to_uppercase());
result
.cloned()
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
}
fn udfs(&self) -> HashSet<String> {
self.scalar_functions.keys().cloned().collect()
}
}

View File

@@ -14,6 +14,7 @@ use s3s::dto::SelectObjectContentInput;
use crate::{
dispatcher::manager::SimpleQueryDispatcherBuilder,
execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler},
function::simple_func_manager::SimpleFunctionMetadataManager,
metadata::base_table::BaseTableProvider,
sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser},
};
@@ -63,6 +64,8 @@ where
}
pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult<impl DatabaseManagerSystem> {
// init Function Manager, we can define some UDF if need
let func_manager = SimpleFunctionMetadataManager::default();
// TODO session config need load global system config
let session_factory = Arc::new(SessionCtxFactory {});
let parser = Arc::new(DefaultParser::default());
@@ -76,6 +79,7 @@ pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult<impl
let query_dispatcher = SimpleQueryDispatcherBuilder::default()
.with_input(input)
.with_func_manager(Arc::new(func_manager))
.with_default_table_provider(default_table_provider)
.with_session_factory(session_factory)
.with_parser(parser)

View File

@@ -1,6 +1,7 @@
pub mod data_source;
pub mod dispatcher;
pub mod execution;
pub mod function;
pub mod instance;
pub mod metadata;
pub mod sql;

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use crate::data_source::data_source::TableHandle;
use crate::data_source::table_source::TableHandle;
use super::TableHandleProvider;

View File

@@ -1,23 +1,19 @@
use std::sync::Arc;
use api::query::{function::FuncMetaManagerRef, session::SessionCtx};
use api::ResolvedTable;
use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result as DFResult;
use datafusion::datasource::listing::ListingTable;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionState;
use datafusion::logical_expr::var_provider::is_system_variables;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion::sql::ResolvedTableReference;
use datafusion::variable::VarType;
use datafusion::{
config::ConfigOptions,
sql::{planner::ContextProvider, TableReference},
};
use crate::data_source::data_source::{TableHandle, TableSourceAdapter};
use crate::data_source::table_source::{TableHandle, TableSourceAdapter};
pub mod base_table;
@@ -58,11 +54,8 @@ impl MetadataProvider {
}
fn build_table_handle(&self) -> datafusion::common::Result<TableHandle> {
self.current_session_table_provider
.build_table_handle(self.provider.clone())
self.current_session_table_provider.build_table_handle(self.provider.clone())
}
async fn init(&self) {}
}
impl ContextProviderExtension for MetadataProvider {

View File

@@ -23,8 +23,6 @@ use tracing::debug;
use crate::sql::analyzer::DefaultAnalyzer;
const PUSH_DOWN_PROJECTION_INDEX: usize = 24;
pub trait LogicalOptimizer: Send + Sync {
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan>;
@@ -93,7 +91,7 @@ impl Default for DefaultLogicalOptimizer {
impl LogicalOptimizer for DefaultLogicalOptimizer {
fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult<LogicalPlan> {
let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session).map(|p| p).map_err(|e| e)? };
let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session)? };
debug!("Analyzed logical plan:\n{}\n", plan.df_plan.display_indent_schema(),);
@@ -101,9 +99,7 @@ impl LogicalOptimizer for DefaultLogicalOptimizer {
SessionStateBuilder::new_from_existing(session.inner().clone())
.with_optimizer_rules(self.rules.clone())
.build()
.optimize(&analyzed_plan)
.map(|p| p)
.map_err(|e| e)?
.optimize(&analyzed_plan)?
};
Ok(optimizeed_plan)

View File

@@ -31,18 +31,14 @@ impl Optimizer for CascadeOptimizer {
let physical_plan = {
self.physical_planner
.create_physical_plan(&optimized_logical_plan, session)
.await
.map(|p| p)
.map_err(|err| err)?
.await?
};
debug!("Original physical plan:\n{}\n", displayable(physical_plan.as_ref()).indent(false));
let optimized_physical_plan = {
self.physical_optimizer
.optimize(physical_plan, session)
.map(|p| p)
.map_err(|err| err)?
.optimize(physical_plan, session)?
};
Ok(optimized_physical_plan)

View File

@@ -82,12 +82,7 @@ impl<'a> ExtParser<'a> {
/// Parse a new expression
fn parse_statement(&mut self) -> Result<ExtStatement> {
match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
_ => Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))),
},
_ => Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))),
}
Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?)))
}
// Report unexpected token

View File

@@ -13,14 +13,14 @@ use datafusion::sql::{planner::SqlToRel, sqlparser::ast::Statement};
use crate::metadata::ContextProviderExtension;
pub struct SqlPlanner<'a, S: ContextProviderExtension> {
schema_provider: &'a S,
_schema_provider: &'a S,
df_planner: SqlToRel<'a, S>,
}
#[async_trait]
impl<'a, S: ContextProviderExtension + Send + Sync> LogicalPlanner for SqlPlanner<'a, S> {
impl<S: ContextProviderExtension + Send + Sync> LogicalPlanner for SqlPlanner<'_, S> {
async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult<Plan> {
let plan = { self.statement_to_plan(statement, session).await.map_err(|err| err)? };
let plan = { self.statement_to_plan(statement, session).await? };
Ok(plan)
}
@@ -30,7 +30,7 @@ impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> {
/// Create a new query planner
pub fn new(schema_provider: &'a S) -> Self {
SqlPlanner {
schema_provider,
_schema_provider: schema_provider,
df_planner: SqlToRel::new(schema_provider),
}
}