From c03f86b23cc7a477a9434893131c15e3e52900b1 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Mon, 28 Jul 2025 09:19:47 +0800 Subject: [PATCH] s3Select: add unit test case Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/s3select-api/src/lib.rs | 82 ++++++ crates/s3select-api/src/test/mod.rs | 17 ++ .../src/test/query_execution_test.rs | 167 ++++++++++++ crates/s3select-query/src/lib.rs | 28 +- .../src/test/error_handling_test.rs | 247 ++++++++++++++++++ .../src/test/integration_test.rs | 228 ++++++++++++++++ crates/s3select-query/src/test/mod.rs | 18 ++ 7 files changed, 782 insertions(+), 5 deletions(-) create mode 100644 crates/s3select-api/src/test/mod.rs create mode 100644 crates/s3select-api/src/test/query_execution_test.rs create mode 100644 crates/s3select-query/src/test/error_handling_test.rs create mode 100644 crates/s3select-query/src/test/integration_test.rs create mode 100644 crates/s3select-query/src/test/mod.rs diff --git a/crates/s3select-api/src/lib.rs b/crates/s3select-api/src/lib.rs index 24722bd7..3cee17e7 100644 --- a/crates/s3select-api/src/lib.rs +++ b/crates/s3select-api/src/lib.rs @@ -21,6 +21,9 @@ pub mod object_store; pub mod query; pub mod server; +#[cfg(test)] +mod test; + pub type QueryResult = Result; #[derive(Debug, Snafu)] @@ -90,3 +93,82 @@ impl Display for ResolvedTable { write!(f, "{table}") } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::common::DataFusionError; + use datafusion::sql::sqlparser::parser::ParserError; + + #[test] + fn test_query_error_display() { + let err = QueryError::NotImplemented { + err: "feature X".to_string(), + }; + assert_eq!(err.to_string(), "This feature is not implemented: feature X"); + + let err = QueryError::MultiStatement { + num: 2, + sql: "SELECT 1; SELECT 2;".to_string(), + }; + assert_eq!(err.to_string(), "Multi-statement not allow, found num:2, sql:SELECT 1; SELECT 2;"); + + let err = QueryError::Cancel; + assert_eq!(err.to_string(), "The query has been canceled"); + + let err = QueryError::FunctionNotExists { + name: "my_func".to_string(), + }; + assert_eq!(err.to_string(), "Udf not exists, name:my_func."); + + let err = QueryError::StoreError { + e: "connection failed".to_string(), + }; + assert_eq!(err.to_string(), "Store Error, e:connection failed."); + } + + #[test] + fn test_query_error_from_datafusion_error() { + let df_error = DataFusionError::Plan("invalid plan".to_string()); + let query_error: QueryError = df_error.into(); + + match query_error { + QueryError::Datafusion { source, .. } => { + assert!(source.to_string().contains("invalid plan")); + } + _ => panic!("Expected Datafusion error"), + } + } + + #[test] + fn test_query_error_from_parser_error() { + let parser_error = ParserError::ParserError("syntax error".to_string()); + let query_error = QueryError::Parser { source: parser_error }; + + assert!(query_error.to_string().contains("syntax error")); + } + + #[test] + fn test_resolved_table() { + let table = ResolvedTable { + table: "my_table".to_string(), + }; + + assert_eq!(table.table(), "my_table"); + assert_eq!(table.to_string(), "my_table"); + } + + #[test] + fn test_resolved_table_clone_and_eq() { + let table1 = ResolvedTable { + table: "table1".to_string(), + }; + let table2 = table1.clone(); + let table3 = ResolvedTable { + table: "table2".to_string(), + }; + + assert_eq!(table1, table2); + assert_ne!(table1, table3); + } +} diff --git a/crates/s3select-api/src/test/mod.rs b/crates/s3select-api/src/test/mod.rs new file mode 100644 index 00000000..18c292be --- /dev/null +++ b/crates/s3select-api/src/test/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Test modules for s3select-api + +pub mod query_execution_test; diff --git a/crates/s3select-api/src/test/query_execution_test.rs b/crates/s3select-api/src/test/query_execution_test.rs new file mode 100644 index 00000000..25c65331 --- /dev/null +++ b/crates/s3select-api/src/test/query_execution_test.rs @@ -0,0 +1,167 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod tests { + use crate::query::execution::{DONE, Output, QueryExecution, QueryState, QueryType, RUNNING}; + use crate::{QueryError, QueryResult}; + use async_trait::async_trait; + + #[test] + fn test_query_type_display() { + assert_eq!(format!("{}", QueryType::Batch), "batch"); + assert_eq!(format!("{}", QueryType::Stream), "stream"); + } + + #[test] + fn test_query_type_equality() { + assert_eq!(QueryType::Batch, QueryType::Batch); + assert_ne!(QueryType::Batch, QueryType::Stream); + assert_eq!(QueryType::Stream, QueryType::Stream); + } + + #[tokio::test] + async fn test_output_nil_methods() { + let output = Output::Nil(()); + + let result = output.chunk_result().await; + assert!(result.is_ok(), "Output::Nil result should be Ok"); + + let output2 = Output::Nil(()); + let rows = output2.num_rows().await; + assert_eq!(rows, 0, "Output::Nil should have 0 rows"); + + let output3 = Output::Nil(()); + let affected = output3.affected_rows().await; + assert_eq!(affected, 0, "Output::Nil should have 0 affected rows"); + } + + #[test] + fn test_query_state_as_ref() { + let accepting = QueryState::ACCEPTING; + assert_eq!(accepting.as_ref(), "ACCEPTING"); + + let running = QueryState::RUNNING(RUNNING::ANALYZING); + assert_eq!(running.as_ref(), "ANALYZING"); + + let done = QueryState::DONE(DONE::FINISHED); + assert_eq!(done.as_ref(), "FINISHED"); + } + + #[test] + fn test_running_state_as_ref() { + assert_eq!(RUNNING::DISPATCHING.as_ref(), "DISPATCHING"); + assert_eq!(RUNNING::ANALYZING.as_ref(), "ANALYZING"); + assert_eq!(RUNNING::OPTMIZING.as_ref(), "OPTMIZING"); + assert_eq!(RUNNING::SCHEDULING.as_ref(), "SCHEDULING"); + } + + #[test] + fn test_done_state_as_ref() { + assert_eq!(DONE::FINISHED.as_ref(), "FINISHED"); + assert_eq!(DONE::FAILED.as_ref(), "FAILED"); + assert_eq!(DONE::CANCELLED.as_ref(), "CANCELLED"); + } + + // Mock implementation for testing + struct MockQueryExecution { + should_succeed: bool, + should_cancel: bool, + } + + #[async_trait] + impl QueryExecution for MockQueryExecution { + async fn start(&self) -> QueryResult { + if self.should_cancel { + return Err(QueryError::Cancel); + } + + if self.should_succeed { + Ok(Output::Nil(())) + } else { + Err(QueryError::NotImplemented { + err: "Mock execution failed".to_string(), + }) + } + } + + fn cancel(&self) -> QueryResult<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_mock_query_execution_success() { + let execution = MockQueryExecution { + should_succeed: true, + should_cancel: false, + }; + + let result = execution.start().await; + assert!(result.is_ok(), "Mock execution should succeed"); + + if let Ok(Output::Nil(_)) = result { + // Expected result + } else { + panic!("Expected Output::Nil"); + } + } + + #[tokio::test] + async fn test_mock_query_execution_failure() { + let execution = MockQueryExecution { + should_succeed: false, + should_cancel: false, + }; + + let result = execution.start().await; + assert!(result.is_err(), "Mock execution should fail"); + + if let Err(QueryError::NotImplemented { .. }) = result { + // Expected error + } else { + panic!("Expected NotImplemented error"); + } + } + + #[tokio::test] + async fn test_mock_query_execution_cancel() { + let execution = MockQueryExecution { + should_succeed: false, + should_cancel: true, + }; + + let result = execution.start().await; + assert!(result.is_err(), "Cancelled execution should fail"); + + if let Err(QueryError::Cancel) = result { + // Expected cancellation error + } else { + panic!("Expected Cancel error"); + } + + let cancel_result = execution.cancel(); + assert!(cancel_result.is_ok(), "Cancel should succeed"); + } + + #[test] + fn test_query_execution_default_type() { + let execution = MockQueryExecution { + should_succeed: true, + should_cancel: false, + }; + + assert_eq!(execution.query_type(), QueryType::Batch); + } +} diff --git a/crates/s3select-query/src/lib.rs b/crates/s3select-query/src/lib.rs index 4fab1772..cfc0d070 100644 --- a/crates/s3select-query/src/lib.rs +++ b/crates/s3select-query/src/lib.rs @@ -20,6 +20,9 @@ pub mod instance; pub mod metadata; pub mod sql; +#[cfg(test)] +mod test; + use rustfs_s3select_api::{QueryResult, server::dbms::DatabaseManagerSystem}; use s3s::dto::SelectObjectContentInput; use std::sync::{Arc, LazyLock}; @@ -75,10 +78,25 @@ pub async fn get_global_db( } /// Create a fresh database instance without using cached components (for testing) -pub async fn create_fresh_db( - input: SelectObjectContentInput, - enable_debug: bool, -) -> QueryResult> { - let db = crate::instance::make_rustfsms(Arc::new(input), enable_debug).await?; +pub async fn create_fresh_db() -> QueryResult> { + // Create a default test input for fresh database creation + let default_input = SelectObjectContentInput { + bucket: "test-bucket".to_string(), + expected_bucket_owner: None, + key: "test.csv".to_string(), + sse_customer_algorithm: None, + sse_customer_key: None, + sse_customer_key_md5: None, + request: s3s::dto::SelectObjectContentRequest { + expression: "SELECT * FROM S3Object".to_string(), + expression_type: s3s::dto::ExpressionType::from_static("SQL"), + input_serialization: s3s::dto::InputSerialization::default(), + output_serialization: s3s::dto::OutputSerialization::default(), + request_progress: None, + scan_range: None, + }, + }; + + let db = crate::instance::make_rustfsms(Arc::new(default_input), true).await?; Ok(Arc::new(db) as Arc) } diff --git a/crates/s3select-query/src/test/error_handling_test.rs b/crates/s3select-query/src/test/error_handling_test.rs new file mode 100644 index 00000000..289cdcc8 --- /dev/null +++ b/crates/s3select-query/src/test/error_handling_test.rs @@ -0,0 +1,247 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod error_handling_tests { + use crate::get_global_db; + use rustfs_s3select_api::{ + QueryError, + query::{Context, Query}, + }; + use s3s::dto::{ + CSVInput, ExpressionType, FileHeaderInfo, InputSerialization, SelectObjectContentInput, SelectObjectContentRequest, + }; + use std::sync::Arc; + + fn create_test_input_with_sql(sql: &str) -> SelectObjectContentInput { + SelectObjectContentInput { + bucket: "test-bucket".to_string(), + expected_bucket_owner: None, + key: "test.csv".to_string(), + sse_customer_algorithm: None, + sse_customer_key: None, + sse_customer_key_md5: None, + request: SelectObjectContentRequest { + expression: sql.to_string(), + expression_type: ExpressionType::from_static("SQL"), + input_serialization: InputSerialization { + csv: Some(CSVInput { + file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::USE)), + ..Default::default() + }), + ..Default::default() + }, + output_serialization: s3s::dto::OutputSerialization::default(), + request_progress: None, + scan_range: None, + }, + } + } + + #[tokio::test] + async fn test_syntax_error_handling() { + let invalid_sqls = vec![ + "INVALID SQL", + "SELECT FROM", + "SELECT * FORM S3Object", // typo in FROM + "SELECT * FROM", + "SELECT * FROM S3Object WHERE", + "SELECT COUNT( FROM S3Object", // missing closing parenthesis + ]; + + for sql in invalid_sqls { + let input = create_test_input_with_sql(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_err(), "Expected error for SQL: {sql}"); + } + } + + #[tokio::test] + async fn test_multi_statement_error() { + let multi_statement_sqls = vec![ + "SELECT * FROM S3Object; SELECT 1;", + "SELECT 1; SELECT 2; SELECT 3;", + "SELECT * FROM S3Object; DROP TABLE test;", + ]; + + for sql in multi_statement_sqls { + let input = create_test_input_with_sql(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_err(), "Expected multi-statement error for SQL: {sql}"); + + if let Err(QueryError::MultiStatement { num, .. }) = result { + assert!(num >= 2, "Expected at least 2 statements, got: {num}"); + } + } + } + + #[tokio::test] + async fn test_unsupported_operations() { + let unsupported_sqls = vec![ + "INSERT INTO S3Object VALUES (1, 'test')", + "UPDATE S3Object SET name = 'test'", + "DELETE FROM S3Object", + "CREATE TABLE test (id INT)", + "DROP TABLE S3Object", + ]; + + for sql in unsupported_sqls { + let input = create_test_input_with_sql(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + // These should either fail with syntax error or not implemented error + assert!(result.is_err(), "Expected error for unsupported SQL: {sql}"); + } + } + + #[tokio::test] + async fn test_invalid_column_references() { + let invalid_column_sqls = vec![ + "SELECT nonexistent_column FROM S3Object", + "SELECT * FROM S3Object WHERE nonexistent_column = 1", + "SELECT * FROM S3Object ORDER BY nonexistent_column", + "SELECT * FROM S3Object GROUP BY nonexistent_column", + ]; + + for sql in invalid_column_sqls { + let input = create_test_input_with_sql(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + // These might succeed or fail depending on schema inference + // The test verifies that the system handles them gracefully + match result { + Ok(_) => { + // If it succeeds, verify we can get results + let handle = result.unwrap(); + let output = handle.result().chunk_result().await; + // Should either succeed with empty results or fail gracefully + let _ = output; + } + Err(_) => { + // Expected to fail - this is acceptable + } + } + } + } + + #[tokio::test] + async fn test_complex_query_error_recovery() { + let complex_invalid_sql = r#" + SELECT + name, + age, + INVALID_FUNCTION(salary) as invalid_calc, + department + FROM S3Object + WHERE age > 'invalid_number' + GROUP BY department, nonexistent_column + HAVING COUNT(*) > INVALID_FUNCTION() + ORDER BY invalid_column + "#; + + let input = create_test_input_with_sql(complex_invalid_sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, complex_invalid_sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_err(), "Expected error for complex invalid SQL"); + } + + #[tokio::test] + async fn test_empty_query() { + let empty_sqls = vec!["", " ", "\n\t \n"]; + + for sql in empty_sqls { + let input = create_test_input_with_sql(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + // Empty queries might be handled differently by the parser + match result { + Ok(_) => { + // Some parsers might accept empty queries + } + Err(_) => { + // Expected to fail for empty SQL + } + } + } + } + + #[tokio::test] + async fn test_very_long_query() { + // Create a very long but valid query + let mut long_sql = "SELECT ".to_string(); + for i in 0..1000 { + if i > 0 { + long_sql.push_str(", "); + } + long_sql.push_str(&format!("'column_{i}' as col_{i}")); + } + long_sql.push_str(" FROM S3Object LIMIT 1"); + + let input = create_test_input_with_sql(&long_sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, long_sql); + + let result = db.execute(&query).await; + // This should either succeed or fail gracefully + match result { + Ok(handle) => { + let output = handle.result().chunk_result().await; + assert!(output.is_ok(), "Query execution should complete successfully"); + } + Err(_) => { + // Acceptable to fail due to resource constraints + } + } + } + + #[tokio::test] + async fn test_sql_injection_patterns() { + let injection_patterns = vec![ + "SELECT * FROM S3Object WHERE name = 'test'; DROP TABLE users; --", + "SELECT * FROM S3Object UNION SELECT * FROM information_schema.tables", + "SELECT * FROM S3Object WHERE 1=1 OR 1=1", + ]; + + for sql in injection_patterns { + let input = create_test_input_with_sql(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + // These should be handled safely - either succeed with limited scope or fail + match result { + Ok(_) => { + // If successful, it should only access S3Object data + } + Err(_) => { + // Expected to fail for security reasons + } + } + } + } +} diff --git a/crates/s3select-query/src/test/integration_test.rs b/crates/s3select-query/src/test/integration_test.rs new file mode 100644 index 00000000..15a84d43 --- /dev/null +++ b/crates/s3select-query/src/test/integration_test.rs @@ -0,0 +1,228 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod integration_tests { + use crate::{create_fresh_db, get_global_db, instance::make_rustfsms}; + use rustfs_s3select_api::{ + QueryError, + query::{Context, Query}, + }; + use s3s::dto::{ + CSVInput, CSVOutput, ExpressionType, FileHeaderInfo, InputSerialization, OutputSerialization, SelectObjectContentInput, + SelectObjectContentRequest, + }; + use std::sync::Arc; + + fn create_test_input(sql: &str) -> SelectObjectContentInput { + SelectObjectContentInput { + bucket: "test-bucket".to_string(), + expected_bucket_owner: None, + key: "test.csv".to_string(), + sse_customer_algorithm: None, + sse_customer_key: None, + sse_customer_key_md5: None, + request: SelectObjectContentRequest { + expression: sql.to_string(), + expression_type: ExpressionType::from_static("SQL"), + input_serialization: InputSerialization { + csv: Some(CSVInput { + file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::USE)), + ..Default::default() + }), + ..Default::default() + }, + output_serialization: OutputSerialization { + csv: Some(CSVOutput::default()), + ..Default::default() + }, + request_progress: None, + scan_range: None, + }, + } + } + + #[tokio::test] + async fn test_database_creation() { + let input = create_test_input("SELECT * FROM S3Object"); + let result = make_rustfsms(Arc::new(input), true).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_global_db_creation() { + let input = create_test_input("SELECT * FROM S3Object"); + let result = get_global_db(input.clone(), true).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_fresh_db_creation() { + let result = create_fresh_db().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_simple_select_query() { + let sql = "SELECT * FROM S3Object"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_ok()); + + let query_handle = result.unwrap(); + let output = query_handle.result().chunk_result().await; + assert!(output.is_ok()); + } + + #[tokio::test] + async fn test_select_with_where_clause() { + let sql = "SELECT name, age FROM S3Object WHERE age > 30"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_select_with_aggregation() { + let sql = "SELECT department, COUNT(*) as count FROM S3Object GROUP BY department"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + // Aggregation queries might fail due to lack of actual data, which is acceptable + match result { + Ok(_) => { + // If successful, that's great + } + Err(_) => { + // Expected to fail due to no actual data source + } + } + } + + #[tokio::test] + async fn test_invalid_sql_syntax() { + let sql = "INVALID SQL SYNTAX"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_multi_statement_error() { + let sql = "SELECT * FROM S3Object; SELECT 1;"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_err()); + + if let Err(QueryError::MultiStatement { num, .. }) = result { + assert_eq!(num, 2); + } else { + panic!("Expected MultiStatement error"); + } + } + + #[tokio::test] + async fn test_query_state_machine_workflow() { + let sql = "SELECT * FROM S3Object"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + // Test state machine creation + let state_machine = db.build_query_state_machine(query.clone()).await; + assert!(state_machine.is_ok()); + + let state_machine = state_machine.unwrap(); + + // Test logical plan building + let logical_plan = db.build_logical_plan(state_machine.clone()).await; + assert!(logical_plan.is_ok()); + + // Test execution if plan exists + if let Ok(Some(plan)) = logical_plan { + let execution_result = db.execute_logical_plan(plan, state_machine).await; + assert!(execution_result.is_ok()); + } + } + + #[tokio::test] + async fn test_query_with_limit() { + let sql = "SELECT * FROM S3Object LIMIT 5"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_ok()); + + let query_handle = result.unwrap(); + let output = query_handle.result().chunk_result().await.unwrap(); + + // Verify that we get results (exact count depends on test data) + let total_rows: usize = output.iter().map(|batch| batch.num_rows()).sum(); + assert!(total_rows <= 5); + } + + #[tokio::test] + async fn test_query_with_order_by() { + let sql = "SELECT name, age FROM S3Object ORDER BY age DESC"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input: Arc::new(input) }, sql.to_string()); + + let result = db.execute(&query).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_concurrent_queries() { + let sql = "SELECT * FROM S3Object"; + let input = create_test_input(sql); + let db = get_global_db(input.clone(), true).await.unwrap(); + + // Execute multiple queries concurrently + let mut handles = vec![]; + for i in 0..3 { + let query = Query::new( + Context { + input: Arc::new(input.clone()), + }, + format!("SELECT * FROM S3Object LIMIT {}", i + 1), + ); + let db_clone = db.clone(); + let handle = tokio::spawn(async move { db_clone.execute(&query).await }); + handles.push(handle); + } + + // Wait for all queries to complete + for handle in handles { + let result = handle.await.unwrap(); + assert!(result.is_ok()); + } + } +} diff --git a/crates/s3select-query/src/test/mod.rs b/crates/s3select-query/src/test/mod.rs new file mode 100644 index 00000000..b15a5115 --- /dev/null +++ b/crates/s3select-query/src/test/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Test modules for s3select-query + +pub mod error_handling_test; +pub mod integration_test;