mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Refactor: DatabaseManagerSystem as global
Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
@@ -107,27 +107,51 @@ pub async fn make_rustfsms(input: Arc<SelectObjectContentInput>, is_test: bool)
|
||||
Ok(db_server)
|
||||
}
|
||||
|
||||
pub async fn make_rustfsms_with_components(
|
||||
input: Arc<SelectObjectContentInput>,
|
||||
is_test: bool,
|
||||
func_manager: Arc<SimpleFunctionMetadataManager>,
|
||||
parser: Arc<DefaultParser>,
|
||||
query_execution_factory: Arc<SqlQueryExecutionFactory>,
|
||||
default_table_provider: Arc<BaseTableProvider>,
|
||||
) -> QueryResult<impl DatabaseManagerSystem> {
|
||||
// TODO session config need load global system config
|
||||
let session_factory = Arc::new(SessionCtxFactory { is_test });
|
||||
|
||||
let query_dispatcher = SimpleQueryDispatcherBuilder::default()
|
||||
.with_input(input)
|
||||
.with_func_manager(func_manager)
|
||||
.with_default_table_provider(default_table_provider)
|
||||
.with_session_factory(session_factory)
|
||||
.with_parser(parser)
|
||||
.with_query_execution_factory(query_execution_factory)
|
||||
.build()?;
|
||||
|
||||
let mut builder = RustFSmsBuilder::default();
|
||||
|
||||
let db_server = builder.query_dispatcher(query_dispatcher).build().expect("build db server");
|
||||
|
||||
Ok(db_server)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::{arrow::util::pretty, assert_batches_eq};
|
||||
use rustfs_s3select_api::{
|
||||
query::{Context, Query},
|
||||
server::dbms::DatabaseManagerSystem,
|
||||
};
|
||||
use rustfs_s3select_api::query::{Context, Query};
|
||||
use s3s::dto::{
|
||||
CSVInput, CSVOutput, ExpressionType, FieldDelimiter, FileHeaderInfo, InputSerialization, OutputSerialization,
|
||||
RecordDelimiter, SelectObjectContentInput, SelectObjectContentRequest,
|
||||
};
|
||||
|
||||
use crate::instance::make_rustfsms;
|
||||
use crate::get_global_db;
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn test_simple_sql() {
|
||||
let sql = "select * from S3Object";
|
||||
let input = Arc::new(SelectObjectContentInput {
|
||||
let input = SelectObjectContentInput {
|
||||
bucket: "dandan".to_string(),
|
||||
expected_bucket_owner: None,
|
||||
key: "test.csv".to_string(),
|
||||
@@ -151,9 +175,9 @@ mod tests {
|
||||
request_progress: None,
|
||||
scan_range: None,
|
||||
},
|
||||
});
|
||||
let db = make_rustfsms(input.clone(), true).await.unwrap();
|
||||
let query = Query::new(Context { input }, sql.to_string());
|
||||
};
|
||||
let db = get_global_db(input.clone(), true).await.unwrap();
|
||||
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
|
||||
|
||||
let result = db.execute(&query).await.unwrap();
|
||||
|
||||
@@ -184,7 +208,7 @@ mod tests {
|
||||
#[ignore]
|
||||
async fn test_func_sql() {
|
||||
let sql = "SELECT * FROM S3Object s";
|
||||
let input = Arc::new(SelectObjectContentInput {
|
||||
let input = SelectObjectContentInput {
|
||||
bucket: "dandan".to_string(),
|
||||
expected_bucket_owner: None,
|
||||
key: "test.csv".to_string(),
|
||||
@@ -210,9 +234,9 @@ mod tests {
|
||||
request_progress: None,
|
||||
scan_range: None,
|
||||
},
|
||||
});
|
||||
let db = make_rustfsms(input.clone(), true).await.unwrap();
|
||||
let query = Query::new(Context { input }, sql.to_string());
|
||||
};
|
||||
let db = get_global_db(input.clone(), true).await.unwrap();
|
||||
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
|
||||
|
||||
let result = db.execute(&query).await.unwrap();
|
||||
|
||||
|
||||
@@ -19,3 +19,65 @@ pub mod function;
|
||||
pub mod instance;
|
||||
pub mod metadata;
|
||||
pub mod sql;
|
||||
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use rustfs_s3select_api::{QueryResult, server::dbms::DatabaseManagerSystem};
|
||||
use s3s::dto::SelectObjectContentInput;
|
||||
|
||||
use crate::{
|
||||
execution::{factory::SqlQueryExecutionFactory, scheduler::local::LocalScheduler},
|
||||
function::simple_func_manager::SimpleFunctionMetadataManager,
|
||||
metadata::base_table::BaseTableProvider,
|
||||
sql::{optimizer::CascadeOptimizerBuilder, parser::DefaultParser},
|
||||
};
|
||||
|
||||
// Global cached components that can be reused across database instances
|
||||
struct GlobalComponents {
|
||||
func_manager: Arc<SimpleFunctionMetadataManager>,
|
||||
parser: Arc<DefaultParser>,
|
||||
query_execution_factory: Arc<SqlQueryExecutionFactory>,
|
||||
default_table_provider: Arc<BaseTableProvider>,
|
||||
}
|
||||
|
||||
static GLOBAL_COMPONENTS: LazyLock<GlobalComponents> = LazyLock::new(|| {
|
||||
let func_manager = Arc::new(SimpleFunctionMetadataManager::default());
|
||||
let parser = Arc::new(DefaultParser::default());
|
||||
let optimizer = Arc::new(CascadeOptimizerBuilder::default().build());
|
||||
let scheduler = Arc::new(LocalScheduler {});
|
||||
let query_execution_factory = Arc::new(SqlQueryExecutionFactory::new(optimizer, scheduler));
|
||||
let default_table_provider = Arc::new(BaseTableProvider::default());
|
||||
|
||||
GlobalComponents {
|
||||
func_manager,
|
||||
parser,
|
||||
query_execution_factory,
|
||||
default_table_provider,
|
||||
}
|
||||
});
|
||||
|
||||
/// Get or create database instance with cached components
|
||||
pub async fn get_global_db(
|
||||
input: SelectObjectContentInput,
|
||||
enable_debug: bool,
|
||||
) -> QueryResult<Arc<dyn DatabaseManagerSystem + Send + Sync>> {
|
||||
let components = &*GLOBAL_COMPONENTS;
|
||||
let db = crate::instance::make_rustfsms_with_components(
|
||||
Arc::new(input),
|
||||
enable_debug,
|
||||
components.func_manager.clone(),
|
||||
components.parser.clone(),
|
||||
components.query_execution_factory.clone(),
|
||||
components.default_table_provider.clone(),
|
||||
).await?;
|
||||
|
||||
Ok(Arc::new(db) as Arc<dyn DatabaseManagerSystem + Send + Sync>)
|
||||
}
|
||||
|
||||
/// Create a fresh database instance without using cached components (for testing)
|
||||
pub async fn create_fresh_db(
|
||||
input: SelectObjectContentInput,
|
||||
enable_debug: bool,
|
||||
) -> QueryResult<Arc<dyn DatabaseManagerSystem + Send + Sync>> {
|
||||
let db = crate::instance::make_rustfsms(Arc::new(input), enable_debug).await?;
|
||||
Ok(Arc::new(db) as Arc<dyn DatabaseManagerSystem + Send + Sync>)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
|
||||
use rustfs_s3select_api::object_store::bytes_stream;
|
||||
use rustfs_s3select_api::query::Context;
|
||||
use rustfs_s3select_api::query::Query;
|
||||
use rustfs_s3select_api::server::dbms::DatabaseManagerSystem;
|
||||
use rustfs_s3select_query::get_global_db;
|
||||
|
||||
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
|
||||
use futures::StreamExt;
|
||||
@@ -86,7 +86,6 @@ use rustfs_rio::EtagReader;
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_rio::Reader;
|
||||
use rustfs_rio::WarpReader;
|
||||
use rustfs_s3select_query::instance::make_rustfsms;
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use rustfs_zip::CompressionFormat;
|
||||
@@ -2674,8 +2673,8 @@ impl S3 for FS {
|
||||
let input = Arc::new(req.input);
|
||||
info!("{:?}", input);
|
||||
|
||||
let db = make_rustfsms(input.clone(), false).await.map_err(|e| {
|
||||
error!("make db failed, {}", e.to_string());
|
||||
let db = get_global_db((*input).clone(), false).await.map_err(|e| {
|
||||
error!("get global db failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
let query = Query::new(Context { input: input.clone() }, input.request.expression.clone());
|
||||
|
||||
Reference in New Issue
Block a user