From addc964d56b738465fd80c0055ae051552c703cd Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Thu, 24 Jul 2025 17:12:51 +0800 Subject: [PATCH] Refactor: DatabaseManagerSystem as global Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/s3select-query/src/instance.rs | 50 +++++++++++++++------ crates/s3select-query/src/lib.rs | 62 +++++++++++++++++++++++++++ rustfs/src/storage/ecfs.rs | 7 ++- 3 files changed, 102 insertions(+), 17 deletions(-) diff --git a/crates/s3select-query/src/instance.rs b/crates/s3select-query/src/instance.rs index 403bfd25..2523e9f6 100644 --- a/crates/s3select-query/src/instance.rs +++ b/crates/s3select-query/src/instance.rs @@ -107,27 +107,51 @@ pub async fn make_rustfsms(input: Arc, is_test: bool) Ok(db_server) } +pub async fn make_rustfsms_with_components( + input: Arc, + is_test: bool, + func_manager: Arc, + parser: Arc, + query_execution_factory: Arc, + default_table_provider: Arc, +) -> QueryResult { + // TODO session config need load global system config + let session_factory = Arc::new(SessionCtxFactory { is_test }); + + let query_dispatcher = SimpleQueryDispatcherBuilder::default() + .with_input(input) + .with_func_manager(func_manager) + .with_default_table_provider(default_table_provider) + .with_session_factory(session_factory) + .with_parser(parser) + .with_query_execution_factory(query_execution_factory) + .build()?; + + let mut builder = RustFSmsBuilder::default(); + + let db_server = builder.query_dispatcher(query_dispatcher).build().expect("build db server"); + + Ok(db_server) +} + #[cfg(test)] mod tests { use std::sync::Arc; use datafusion::{arrow::util::pretty, assert_batches_eq}; - use rustfs_s3select_api::{ - query::{Context, Query}, - server::dbms::DatabaseManagerSystem, - }; + use rustfs_s3select_api::query::{Context, Query}; use s3s::dto::{ CSVInput, CSVOutput, ExpressionType, FieldDelimiter, FileHeaderInfo, InputSerialization, OutputSerialization, RecordDelimiter, SelectObjectContentInput, SelectObjectContentRequest, }; - use crate::instance::make_rustfsms; + use crate::get_global_db; #[tokio::test] #[ignore] async fn test_simple_sql() { let sql = "select * from S3Object"; - let input = Arc::new(SelectObjectContentInput { + let input = SelectObjectContentInput { bucket: "dandan".to_string(), expected_bucket_owner: None, key: "test.csv".to_string(), @@ -151,9 +175,9 @@ mod tests { request_progress: None, scan_range: None, }, - }); - let db = make_rustfsms(input.clone(), true).await.unwrap(); - let query = Query::new(Context { input }, sql.to_string()); + }; + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); let result = db.execute(&query).await.unwrap(); @@ -184,7 +208,7 @@ mod tests { #[ignore] async fn test_func_sql() { let sql = "SELECT * FROM S3Object s"; - let input = Arc::new(SelectObjectContentInput { + let input = SelectObjectContentInput { bucket: "dandan".to_string(), expected_bucket_owner: None, key: "test.csv".to_string(), @@ -210,9 +234,9 @@ mod tests { request_progress: None, scan_range: None, }, - }); - let db = make_rustfsms(input.clone(), true).await.unwrap(); - let query = Query::new(Context { input }, sql.to_string()); + }; + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); let result = db.execute(&query).await.unwrap(); diff --git a/crates/s3select-query/src/lib.rs b/crates/s3select-query/src/lib.rs index 2f70e4b8..9f1686da 100644 --- a/crates/s3select-query/src/lib.rs +++ b/crates/s3select-query/src/lib.rs @@ -19,3 +19,65 @@ pub mod function; pub mod instance; pub mod metadata; pub mod sql; + +use std::sync::{Arc, LazyLock}; +use rustfs_s3select_api::{QueryResult, server::dbms::DatabaseManagerSystem}; +use s3s::dto::SelectObjectContentInput; + +use crate::{ + execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler}, + function::simple_func_manager::SimpleFunctionMetadataManager, + metadata::base_table::BaseTableProvider, + sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser}, +}; + +// Global cached components that can be reused across database instances +struct GlobalComponents { + func_manager: Arc, + parser: Arc, + query_execution_factory: Arc, + default_table_provider: Arc, +} + +static GLOBAL_COMPONENTS: LazyLock = LazyLock::new(|| { + let func_manager = Arc::new(SimpleFunctionMetadataManager::default()); + let parser = Arc::new(DefaultParser::default()); + let optimizer = Arc::new(CascadeOptimizerBuilder::default().build()); + let scheduler = Arc::new(LocalScheduler {}); + let query_execution_factory = Arc::new(SqlQueryExecutionFactory::new(optimizer, scheduler)); + let default_table_provider = Arc::new(BaseTableProvider::default()); + + GlobalComponents { + func_manager, + parser, + query_execution_factory, + default_table_provider, + } +}); + +/// Get or create database instance with cached components +pub async fn get_global_db( + input: SelectObjectContentInput, + enable_debug: bool, +) -> QueryResult> { + let components = &*GLOBAL_COMPONENTS; + let db = crate::instance::make_rustfsms_with_components( + Arc::new(input), + enable_debug, + components.func_manager.clone(), + components.parser.clone(), + components.query_execution_factory.clone(), + components.default_table_provider.clone(), + ).await?; + + Ok(Arc::new(db) as Arc) +} + +/// Create a fresh database instance without using cached components (for testing) +pub async fn create_fresh_db( + input: SelectObjectContentInput, + enable_debug: bool, +) -> QueryResult> { + let db = crate::instance::make_rustfsms(Arc::new(input), enable_debug).await?; + Ok(Arc::new(db) as Arc) +} diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 46c3e0ed..06500864 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -32,7 +32,7 @@ use rustfs_ecstore::set_disk::MAX_PARTS_COUNT; use rustfs_s3select_api::object_store::bytes_stream; use rustfs_s3select_api::query::Context; use rustfs_s3select_api::query::Query; -use rustfs_s3select_api::server::dbms::DatabaseManagerSystem; +use rustfs_s3select_query::get_global_db; // use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX; use futures::StreamExt; @@ -86,7 +86,6 @@ use rustfs_rio::EtagReader; use rustfs_rio::HashReader; use rustfs_rio::Reader; use rustfs_rio::WarpReader; -use rustfs_s3select_query::instance::make_rustfsms; use rustfs_utils::CompressionAlgorithm; use rustfs_utils::path::path_join_buf; use rustfs_zip::CompressionFormat; @@ -2674,8 +2673,8 @@ impl S3 for FS { let input = Arc::new(req.input); info!("{:?}", input); - let db = make_rustfsms(input.clone(), false).await.map_err(|e| { - error!("make db failed, {}", e.to_string()); + let db = get_global_db((*input).clone(), false).await.map_err(|e| { + error!("get global db failed, {}", e.to_string()); s3_error!(InternalError, "{}", e.to_string()) })?; let query = Query::new(Context { input: input.clone() }, input.request.expression.clone());