Merge pull request #294 from guojidan/improve-sql

Refactor: DatabaseManagerSystem as global
This commit is contained in:
guojidan
2025-07-25 08:33:54 +08:00
committed by GitHub
3 changed files with 103 additions and 17 deletions

View File

@@ -107,27 +107,51 @@ pub async fn make_rustfsms(input: Arc<SelectObjectContentInput>, is_test: bool)
Ok(db_server)
}
pub async fn make_rustfsms_with_components(
input: Arc<SelectObjectContentInput>,
is_test: bool,
func_manager: Arc<SimpleFunctionMetadataManager>,
parser: Arc<DefaultParser>,
query_execution_factory: Arc<SqlQueryExecutionFactory>,
default_table_provider: Arc<BaseTableProvider>,
) -> QueryResult<impl DatabaseManagerSystem> {
// TODO session config need load global system config
let session_factory = Arc::new(SessionCtxFactory { is_test });
let query_dispatcher = SimpleQueryDispatcherBuilder::default()
.with_input(input)
.with_func_manager(func_manager)
.with_default_table_provider(default_table_provider)
.with_session_factory(session_factory)
.with_parser(parser)
.with_query_execution_factory(query_execution_factory)
.build()?;
let mut builder = RustFSmsBuilder::default();
let db_server = builder.query_dispatcher(query_dispatcher).build().expect("build db server");
Ok(db_server)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::{arrow::util::pretty, assert_batches_eq};
use rustfs_s3select_api::{
query::{Context, Query},
server::dbms::DatabaseManagerSystem,
};
use rustfs_s3select_api::query::{Context, Query};
use s3s::dto::{
CSVInput, CSVOutput, ExpressionType, FieldDelimiter, FileHeaderInfo, InputSerialization, OutputSerialization,
RecordDelimiter, SelectObjectContentInput, SelectObjectContentRequest,
};
use crate::instance::make_rustfsms;
use crate::get_global_db;
#[tokio::test]
#[ignore]
async fn test_simple_sql() {
let sql = "select * from S3Object";
let input = Arc::new(SelectObjectContentInput {
let input = SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
@@ -151,9 +175,9 @@ mod tests {
request_progress: None,
scan_range: None,
},
});
let db = make_rustfsms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());
};
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await.unwrap();
@@ -184,7 +208,7 @@ mod tests {
#[ignore]
async fn test_func_sql() {
let sql = "SELECT * FROM S3Object s";
let input = Arc::new(SelectObjectContentInput {
let input = SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
@@ -210,9 +234,9 @@ mod tests {
request_progress: None,
scan_range: None,
},
});
let db = make_rustfsms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());
};
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await.unwrap();

View File

@@ -19,3 +19,66 @@ pub mod function;
pub mod instance;
pub mod metadata;
pub mod sql;
use rustfs_s3select_api::{QueryResult, server::dbms::DatabaseManagerSystem};
use s3s::dto::SelectObjectContentInput;
use std::sync::{Arc, LazyLock};
use crate::{
execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler},
function::simple_func_manager::SimpleFunctionMetadataManager,
metadata::base_table::BaseTableProvider,
sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser},
};
// Global cached components that can be reused across database instances
struct GlobalComponents {
func_manager: Arc<SimpleFunctionMetadataManager>,
parser: Arc<DefaultParser>,
query_execution_factory: Arc<SqlQueryExecutionFactory>,
default_table_provider: Arc<BaseTableProvider>,
}
static GLOBAL_COMPONENTS: LazyLock<GlobalComponents> = LazyLock::new(|| {
let func_manager = Arc::new(SimpleFunctionMetadataManager::default());
let parser = Arc::new(DefaultParser::default());
let optimizer = Arc::new(CascadeOptimizerBuilder::default().build());
let scheduler = Arc::new(LocalScheduler {});
let query_execution_factory = Arc::new(SqlQueryExecutionFactory::new(optimizer, scheduler));
let default_table_provider = Arc::new(BaseTableProvider::default());
GlobalComponents {
func_manager,
parser,
query_execution_factory,
default_table_provider,
}
});
/// Get or create database instance with cached components
pub async fn get_global_db(
input: SelectObjectContentInput,
enable_debug: bool,
) -> QueryResult<Arc<dyn DatabaseManagerSystem + Send + Sync>> {
let components = &*GLOBAL_COMPONENTS;
let db = crate::instance::make_rustfsms_with_components(
Arc::new(input),
enable_debug,
components.func_manager.clone(),
components.parser.clone(),
components.query_execution_factory.clone(),
components.default_table_provider.clone(),
)
.await?;
Ok(Arc::new(db) as Arc<dyn DatabaseManagerSystem + Send + Sync>)
}
/// Create a fresh database instance without using cached components (for testing)
pub async fn create_fresh_db(
input: SelectObjectContentInput,
enable_debug: bool,
) -> QueryResult<Arc<dyn DatabaseManagerSystem + Send + Sync>> {
let db = crate::instance::make_rustfsms(Arc::new(input), enable_debug).await?;
Ok(Arc::new(db) as Arc<dyn DatabaseManagerSystem + Send + Sync>)
}

View File

@@ -32,7 +32,7 @@ use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_api::server::dbms::DatabaseManagerSystem;
use rustfs_s3select_query::get_global_db;
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
use futures::StreamExt;
@@ -86,7 +86,6 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_s3select_query::instance::make_rustfsms;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
@@ -2674,8 +2673,8 @@ impl S3 for FS {
let input = Arc::new(req.input);
info!("{:?}", input);
let db = make_rustfsms(input.clone(), false).await.map_err(|e| {
error!("make db failed, {}", e.to_string());
let db = get_global_db((*input).clone(), false).await.map_err(|e| {
error!("get global db failed, {}", e.to_string());
s3_error!(InternalError, "{}", e.to_string())
})?;
let query = Query::new(Context { input: input.clone() }, input.request.expression.clone());