From 83e2c8f69fa2c59543389ce12e209ef316a13539 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Fri, 14 Mar 2025 03:26:32 +0000 Subject: [PATCH] tmp3 Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/utils/os/linux.rs | 5 +- rustfs/src/storage/ecfs.rs | 17 ++--- s3select/api/src/lib.rs | 6 ++ s3select/api/src/object_store.rs | 2 +- s3select/query/src/data_source/mod.rs | 2 +- .../{data_source.rs => table_source.rs} | 0 s3select/query/src/dispatcher/manager.rs | 13 ++-- s3select/query/src/function/mod.rs | 1 + .../query/src/function/simple_func_manager.rs | 63 +++++++++++++++++++ s3select/query/src/instance.rs | 4 ++ s3select/query/src/lib.rs | 1 + s3select/query/src/metadata/base_table.rs | 2 +- s3select/query/src/metadata/mod.rs | 11 +--- s3select/query/src/sql/logical/optimizer.rs | 8 +-- s3select/query/src/sql/optimizer.rs | 8 +-- s3select/query/src/sql/parser.rs | 7 +-- s3select/query/src/sql/planner.rs | 8 +-- 17 files changed, 105 insertions(+), 53 deletions(-) rename s3select/query/src/data_source/{data_source.rs => table_source.rs} (100%) create mode 100644 s3select/query/src/function/mod.rs create mode 100644 s3select/query/src/function/simple_func_manager.rs diff --git a/ecstore/src/utils/os/linux.rs b/ecstore/src/utils/os/linux.rs index ca64f818..064b74ae 100644 --- a/ecstore/src/utils/os/linux.rs +++ b/ecstore/src/utils/os/linux.rs @@ -151,7 +151,7 @@ fn read_drive_stats(stats_file: &str) -> Result { fn read_stat(file_name: &str) -> Result> { // 打开文件 let path = Path::new(file_name); - let file = File::open(&path)?; + let file = File::open(path)?; // 创建一个 BufReader let reader = io::BufReader::new(file); @@ -161,7 +161,8 @@ fn read_stat(file_name: &str) -> Result> { if let Some(line) = reader.lines().next() { let line = line?; // 分割行并解析为 u64 - for token in line.trim().split_whitespace() { + // https://rust-lang.github.io/rust-clippy/master/index.html#trim_split_whitespace + for token in line.split_whitespace() { let ui64: u64 = token.parse()?; stats.push(ui64); } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 65544be5..6a83beae 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -6,9 +6,12 @@ use crate::auth::get_condition_values; use crate::storage::access::ReqInfo; use api::query::Context; use api::query::Query; +use api::server::dbms::DatabaseManagerSystem; use bytes::Bytes; use common::error::Result; +use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; use datafusion::arrow::json::writer::JsonArray; +use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; use ecstore::bucket::error::BucketMetadataError; use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG; use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG; @@ -43,7 +46,6 @@ use futures::pin_mut; use futures::{Stream, StreamExt}; use http::HeaderMap; use iam::policy::action::Action; -use api::server::dbms::DatabaseManagerSystem; use iam::policy::action::S3Action; use lazy_static::lazy_static; use log::warn; @@ -61,10 +63,10 @@ use s3s::S3ErrorCode; use s3s::S3Result; use s3s::S3; use s3s::{S3Request, S3Response}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use std::fmt::Debug; use std::str::FromStr; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; use tokio_util::io::StreamReader; use tracing::debug; @@ -1878,13 +1880,12 @@ impl S3 for FS { let input = req.input; info!("{:?}", input); - let db = make_cnosdbms(input).await.map_err(|_| { - s3_error!(InternalError) - })?; - let query = Query::new(Context {input: input.clone()}, input.request.expression); - let result = db.execute(&query).await.map_err(|_| { + let db = make_cnosdbms(input.clone()).await.map_err(|e| { + error!("make db failed, {}", e.to_string()); s3_error!(InternalError) })?; + let query = Query::new(Context { input: input.clone() }, input.request.expression); + let result = db.execute(&query).await.map_err(|_| s3_error!(InternalError))?; let results = result.result().chunk_result().await.unwrap().to_vec(); diff --git a/s3select/api/src/lib.rs b/s3select/api/src/lib.rs index 9c61128b..72a9e9c7 100644 --- a/s3select/api/src/lib.rs +++ b/s3select/api/src/lib.rs @@ -32,6 +32,12 @@ pub enum QueryError { #[snafu(display("{}", source))] Parser { source: ParserError }, + + #[snafu(display("Udf not exists, name:{}.", name))] + FunctionNotExists { name: String }, + + #[snafu(display("Udf already exists, name:{}.", name))] + FunctionExists { name: String }, } impl From for QueryError { diff --git a/s3select/api/src/object_store.rs b/s3select/api/src/object_store.rs index c10a26fa..bb9273cd 100644 --- a/s3select/api/src/object_store.rs +++ b/s3select/api/src/object_store.rs @@ -79,7 +79,7 @@ impl ObjectStore for EcObjectStore { let stream = stream::unfold(reader.stream, |mut blob| async move { match blob.next().await { Some(Ok(chunk)) => { - let bytes = Bytes::from(chunk); + let bytes = chunk; Some((Ok(bytes), blob)) } _ => None, diff --git a/s3select/query/src/data_source/mod.rs b/s3select/query/src/data_source/mod.rs index 5b53f15a..b0704130 100644 --- a/s3select/query/src/data_source/mod.rs +++ b/s3select/query/src/data_source/mod.rs @@ -1 +1 @@ -pub mod data_source; +pub mod table_source; diff --git a/s3select/query/src/data_source/data_source.rs b/s3select/query/src/data_source/table_source.rs similarity index 100% rename from s3select/query/src/data_source/data_source.rs rename to s3select/query/src/data_source/table_source.rs diff --git a/s3select/query/src/dispatcher/manager.rs b/s3select/query/src/dispatcher/manager.rs index effe17bf..3975c4e5 100644 --- a/s3select/query/src/dispatcher/manager.rs +++ b/s3select/query/src/dispatcher/manager.rs @@ -1,7 +1,6 @@ use std::{ - collections::HashMap, pin::Pin, - sync::{Arc, Mutex}, + sync::Arc, task::{Context, Poll}, }; @@ -31,14 +30,10 @@ use datafusion::{ }; use futures::{Stream, StreamExt}; use s3s::dto::SelectObjectContentInput; -use tokio::task::JoinHandle; use crate::{ execution::factory::QueryExecutionFactoryRef, - metadata::{ - base_table::BaseTableProvider, - ContextProviderExtension, MetadataProvider, TableHandleProviderRef, - }, + metadata::{base_table::BaseTableProvider, ContextProviderExtension, MetadataProvider, TableHandleProviderRef}, sql::logical::planner::DefaultLogicalPlanner, }; @@ -46,7 +41,7 @@ use crate::{ pub struct SimpleQueryDispatcher { input: SelectObjectContentInput, // client for default tenant - default_table_provider: TableHandleProviderRef, + _default_table_provider: TableHandleProviderRef, session_factory: Arc, // parser parser: Arc, @@ -264,7 +259,7 @@ impl SimpleQueryDispatcherBuilder { let dispatcher = Arc::new(SimpleQueryDispatcher { input, - default_table_provider, + _default_table_provider: default_table_provider, session_factory, parser, query_execution_factory, diff --git a/s3select/query/src/function/mod.rs b/s3select/query/src/function/mod.rs new file mode 100644 index 00000000..e76614a0 --- /dev/null +++ b/s3select/query/src/function/mod.rs @@ -0,0 +1 @@ +pub mod simple_func_manager; diff --git a/s3select/query/src/function/simple_func_manager.rs b/s3select/query/src/function/simple_func_manager.rs new file mode 100644 index 00000000..129efacf --- /dev/null +++ b/s3select/query/src/function/simple_func_manager.rs @@ -0,0 +1,63 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use api::query::function::FunctionMetadataManager; +use api::{QueryError, QueryResult}; +use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF}; + +pub type SimpleFunctionMetadataManagerRef = Arc; + +#[derive(Debug, Default)] +pub struct SimpleFunctionMetadataManager { + /// Scalar functions that are registered with the context + pub scalar_functions: HashMap>, + /// Aggregate functions registered in the context + pub aggregate_functions: HashMap>, + /// Window functions registered in the context + pub window_functions: HashMap>, +} + +impl FunctionMetadataManager for SimpleFunctionMetadataManager { + fn register_udf(&mut self, f: ScalarUDF) -> QueryResult<()> { + self.scalar_functions.insert(f.inner().name().to_uppercase(), Arc::new(f)); + Ok(()) + } + + fn register_udaf(&mut self, f: AggregateUDF) -> QueryResult<()> { + self.aggregate_functions.insert(f.inner().name().to_uppercase(), Arc::new(f)); + Ok(()) + } + + fn register_udwf(&mut self, f: WindowUDF) -> QueryResult<()> { + self.window_functions.insert(f.inner().name().to_uppercase(), Arc::new(f)); + Ok(()) + } + + fn udf(&self, name: &str) -> QueryResult> { + let result = self.scalar_functions.get(&name.to_uppercase()); + + result + .cloned() + .ok_or_else(|| QueryError::FunctionExists { name: name.to_string() }) + } + + fn udaf(&self, name: &str) -> QueryResult> { + let result = self.aggregate_functions.get(&name.to_uppercase()); + + result + .cloned() + .ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() }) + } + + fn udwf(&self, name: &str) -> QueryResult> { + let result = self.window_functions.get(&name.to_uppercase()); + + result + .cloned() + .ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() }) + } + + fn udfs(&self) -> HashSet { + self.scalar_functions.keys().cloned().collect() + } +} diff --git a/s3select/query/src/instance.rs b/s3select/query/src/instance.rs index 781a88f6..9f4b601b 100644 --- a/s3select/query/src/instance.rs +++ b/s3select/query/src/instance.rs @@ -14,6 +14,7 @@ use s3s::dto::SelectObjectContentInput; use crate::{ dispatcher::manager::SimpleQueryDispatcherBuilder, execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler}, + function::simple_func_manager::SimpleFunctionMetadataManager, metadata::base_table::BaseTableProvider, sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser}, }; @@ -63,6 +64,8 @@ where } pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult { + // init Function Manager, we can define some UDF if need + let func_manager = SimpleFunctionMetadataManager::default(); // TODO session config need load global system config let session_factory = Arc::new(SessionCtxFactory {}); let parser = Arc::new(DefaultParser::default()); @@ -76,6 +79,7 @@ pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult datafusion::common::Result { - self.current_session_table_provider - .build_table_handle(self.provider.clone()) + self.current_session_table_provider.build_table_handle(self.provider.clone()) } - - async fn init(&self) {} } impl ContextProviderExtension for MetadataProvider { diff --git a/s3select/query/src/sql/logical/optimizer.rs b/s3select/query/src/sql/logical/optimizer.rs index 375667ca..e97e2967 100644 --- a/s3select/query/src/sql/logical/optimizer.rs +++ b/s3select/query/src/sql/logical/optimizer.rs @@ -23,8 +23,6 @@ use tracing::debug; use crate::sql::analyzer::DefaultAnalyzer; -const PUSH_DOWN_PROJECTION_INDEX: usize = 24; - pub trait LogicalOptimizer: Send + Sync { fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult; @@ -93,7 +91,7 @@ impl Default for DefaultLogicalOptimizer { impl LogicalOptimizer for DefaultLogicalOptimizer { fn optimize(&self, plan: &QueryPlan, session: &SessionCtx) -> QueryResult { - let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session).map(|p| p).map_err(|e| e)? }; + let analyzed_plan = { self.analyzer.analyze(&plan.df_plan, session)? }; debug!("Analyzed logical plan:\n{}\n", plan.df_plan.display_indent_schema(),); @@ -101,9 +99,7 @@ impl LogicalOptimizer for DefaultLogicalOptimizer { SessionStateBuilder::new_from_existing(session.inner().clone()) .with_optimizer_rules(self.rules.clone()) .build() - .optimize(&analyzed_plan) - .map(|p| p) - .map_err(|e| e)? + .optimize(&analyzed_plan)? }; Ok(optimizeed_plan) diff --git a/s3select/query/src/sql/optimizer.rs b/s3select/query/src/sql/optimizer.rs index 2b546547..13da4eb3 100644 --- a/s3select/query/src/sql/optimizer.rs +++ b/s3select/query/src/sql/optimizer.rs @@ -31,18 +31,14 @@ impl Optimizer for CascadeOptimizer { let physical_plan = { self.physical_planner .create_physical_plan(&optimized_logical_plan, session) - .await - .map(|p| p) - .map_err(|err| err)? + .await? }; debug!("Original physical plan:\n{}\n", displayable(physical_plan.as_ref()).indent(false)); let optimized_physical_plan = { self.physical_optimizer - .optimize(physical_plan, session) - .map(|p| p) - .map_err(|err| err)? + .optimize(physical_plan, session)? }; Ok(optimized_physical_plan) diff --git a/s3select/query/src/sql/parser.rs b/s3select/query/src/sql/parser.rs index 80f8d0db..ebd2b5d4 100644 --- a/s3select/query/src/sql/parser.rs +++ b/s3select/query/src/sql/parser.rs @@ -82,12 +82,7 @@ impl<'a> ExtParser<'a> { /// Parse a new expression fn parse_statement(&mut self) -> Result { - match self.parser.peek_token().token { - Token::Word(w) => match w.keyword { - _ => Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))), - }, - _ => Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))), - } + Ok(ExtStatement::SqlStatement(Box::new(self.parser.parse_statement()?))) } // Report unexpected token diff --git a/s3select/query/src/sql/planner.rs b/s3select/query/src/sql/planner.rs index be6b314d..a6c9f8c1 100644 --- a/s3select/query/src/sql/planner.rs +++ b/s3select/query/src/sql/planner.rs @@ -13,14 +13,14 @@ use datafusion::sql::{planner::SqlToRel, sqlparser::ast::Statement}; use crate::metadata::ContextProviderExtension; pub struct SqlPlanner<'a, S: ContextProviderExtension> { - schema_provider: &'a S, + _schema_provider: &'a S, df_planner: SqlToRel<'a, S>, } #[async_trait] -impl<'a, S: ContextProviderExtension + Send + Sync> LogicalPlanner for SqlPlanner<'a, S> { +impl LogicalPlanner for SqlPlanner<'_, S> { async fn create_logical_plan(&self, statement: ExtStatement, session: &SessionCtx) -> QueryResult { - let plan = { self.statement_to_plan(statement, session).await.map_err(|err| err)? }; + let plan = { self.statement_to_plan(statement, session).await? }; Ok(plan) } @@ -30,7 +30,7 @@ impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> { /// Create a new query planner pub fn new(schema_provider: &'a S) -> Self { SqlPlanner { - schema_provider, + _schema_provider: schema_provider, df_planner: SqlToRel::new(schema_provider), } }