Merge pull request #301 from guojidan/improve-sql

s3Select: add unit test case
This commit is contained in:
guojidan
2025-07-28 09:56:10 +08:00
committed by GitHub
7 changed files with 782 additions and 5 deletions

View File

@@ -21,6 +21,9 @@ pub mod object_store;
pub mod query;
pub mod server;
#[cfg(test)]
mod test;
pub type QueryResult<T> = Result<T, QueryError>;
#[derive(Debug, Snafu)]
@@ -90,3 +93,82 @@ impl Display for ResolvedTable {
write!(f, "{table}")
}
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::common::DataFusionError;
use datafusion::sql::sqlparser::parser::ParserError;
#[test]
fn test_query_error_display() {
let err = QueryError::NotImplemented {
err: "feature X".to_string(),
};
assert_eq!(err.to_string(), "This feature is not implemented: feature X");
let err = QueryError::MultiStatement {
num: 2,
sql: "SELECT 1; SELECT 2;".to_string(),
};
assert_eq!(err.to_string(), "Multi-statement not allow, found num:2, sql:SELECT 1; SELECT 2;");
let err = QueryError::Cancel;
assert_eq!(err.to_string(), "The query has been canceled");
let err = QueryError::FunctionNotExists {
name: "my_func".to_string(),
};
assert_eq!(err.to_string(), "Udf not exists, name:my_func.");
let err = QueryError::StoreError {
e: "connection failed".to_string(),
};
assert_eq!(err.to_string(), "Store Error, e:connection failed.");
}
#[test]
fn test_query_error_from_datafusion_error() {
let df_error = DataFusionError::Plan("invalid plan".to_string());
let query_error: QueryError = df_error.into();
match query_error {
QueryError::Datafusion { source, .. } => {
assert!(source.to_string().contains("invalid plan"));
}
_ => panic!("Expected Datafusion error"),
}
}
#[test]
fn test_query_error_from_parser_error() {
let parser_error = ParserError::ParserError("syntax error".to_string());
let query_error = QueryError::Parser { source: parser_error };
assert!(query_error.to_string().contains("syntax error"));
}
#[test]
fn test_resolved_table() {
let table = ResolvedTable {
table: "my_table".to_string(),
};
assert_eq!(table.table(), "my_table");
assert_eq!(table.to_string(), "my_table");
}
#[test]
fn test_resolved_table_clone_and_eq() {
let table1 = ResolvedTable {
table: "table1".to_string(),
};
let table2 = table1.clone();
let table3 = ResolvedTable {
table: "table2".to_string(),
};
assert_eq!(table1, table2);
assert_ne!(table1, table3);
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Test modules for s3select-api
pub mod query_execution_test;

View File

@@ -0,0 +1,167 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use crate::query::execution::{DONE, Output, QueryExecution, QueryState, QueryType, RUNNING};
use crate::{QueryError, QueryResult};
use async_trait::async_trait;
#[test]
fn test_query_type_display() {
assert_eq!(format!("{}", QueryType::Batch), "batch");
assert_eq!(format!("{}", QueryType::Stream), "stream");
}
#[test]
fn test_query_type_equality() {
assert_eq!(QueryType::Batch, QueryType::Batch);
assert_ne!(QueryType::Batch, QueryType::Stream);
assert_eq!(QueryType::Stream, QueryType::Stream);
}
#[tokio::test]
async fn test_output_nil_methods() {
let output = Output::Nil(());
let result = output.chunk_result().await;
assert!(result.is_ok(), "Output::Nil result should be Ok");
let output2 = Output::Nil(());
let rows = output2.num_rows().await;
assert_eq!(rows, 0, "Output::Nil should have 0 rows");
let output3 = Output::Nil(());
let affected = output3.affected_rows().await;
assert_eq!(affected, 0, "Output::Nil should have 0 affected rows");
}
#[test]
fn test_query_state_as_ref() {
let accepting = QueryState::ACCEPTING;
assert_eq!(accepting.as_ref(), "ACCEPTING");
let running = QueryState::RUNNING(RUNNING::ANALYZING);
assert_eq!(running.as_ref(), "ANALYZING");
let done = QueryState::DONE(DONE::FINISHED);
assert_eq!(done.as_ref(), "FINISHED");
}
#[test]
fn test_running_state_as_ref() {
assert_eq!(RUNNING::DISPATCHING.as_ref(), "DISPATCHING");
assert_eq!(RUNNING::ANALYZING.as_ref(), "ANALYZING");
assert_eq!(RUNNING::OPTMIZING.as_ref(), "OPTMIZING");
assert_eq!(RUNNING::SCHEDULING.as_ref(), "SCHEDULING");
}
#[test]
fn test_done_state_as_ref() {
assert_eq!(DONE::FINISHED.as_ref(), "FINISHED");
assert_eq!(DONE::FAILED.as_ref(), "FAILED");
assert_eq!(DONE::CANCELLED.as_ref(), "CANCELLED");
}
// Mock implementation for testing
struct MockQueryExecution {
should_succeed: bool,
should_cancel: bool,
}
#[async_trait]
impl QueryExecution for MockQueryExecution {
async fn start(&self) -> QueryResult<Output> {
if self.should_cancel {
return Err(QueryError::Cancel);
}
if self.should_succeed {
Ok(Output::Nil(()))
} else {
Err(QueryError::NotImplemented {
err: "Mock execution failed".to_string(),
})
}
}
fn cancel(&self) -> QueryResult<()> {
Ok(())
}
}
#[tokio::test]
async fn test_mock_query_execution_success() {
let execution = MockQueryExecution {
should_succeed: true,
should_cancel: false,
};
let result = execution.start().await;
assert!(result.is_ok(), "Mock execution should succeed");
if let Ok(Output::Nil(_)) = result {
// Expected result
} else {
panic!("Expected Output::Nil");
}
}
#[tokio::test]
async fn test_mock_query_execution_failure() {
let execution = MockQueryExecution {
should_succeed: false,
should_cancel: false,
};
let result = execution.start().await;
assert!(result.is_err(), "Mock execution should fail");
if let Err(QueryError::NotImplemented { .. }) = result {
// Expected error
} else {
panic!("Expected NotImplemented error");
}
}
#[tokio::test]
async fn test_mock_query_execution_cancel() {
let execution = MockQueryExecution {
should_succeed: false,
should_cancel: true,
};
let result = execution.start().await;
assert!(result.is_err(), "Cancelled execution should fail");
if let Err(QueryError::Cancel) = result {
// Expected cancellation error
} else {
panic!("Expected Cancel error");
}
let cancel_result = execution.cancel();
assert!(cancel_result.is_ok(), "Cancel should succeed");
}
#[test]
fn test_query_execution_default_type() {
let execution = MockQueryExecution {
should_succeed: true,
should_cancel: false,
};
assert_eq!(execution.query_type(), QueryType::Batch);
}
}

View File

@@ -20,6 +20,9 @@ pub mod instance;
pub mod metadata;
pub mod sql;
#[cfg(test)]
mod test;
use rustfs_s3select_api::{QueryResult, server::dbms::DatabaseManagerSystem};
use s3s::dto::SelectObjectContentInput;
use std::sync::{Arc, LazyLock};
@@ -75,10 +78,25 @@ pub async fn get_global_db(
}
/// Create a fresh database instance without using cached components (for testing)
pub async fn create_fresh_db(
input: SelectObjectContentInput,
enable_debug: bool,
) -> QueryResult<Arc<dyn DatabaseManagerSystem + Send + Sync>> {
let db = crate::instance::make_rustfsms(Arc::new(input), enable_debug).await?;
pub async fn create_fresh_db() -> QueryResult<Arc<dyn DatabaseManagerSystem + Send + Sync>> {
// Create a default test input for fresh database creation
let default_input = SelectObjectContentInput {
bucket: "test-bucket".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
request: s3s::dto::SelectObjectContentRequest {
expression: "SELECT * FROM S3Object".to_string(),
expression_type: s3s::dto::ExpressionType::from_static("SQL"),
input_serialization: s3s::dto::InputSerialization::default(),
output_serialization: s3s::dto::OutputSerialization::default(),
request_progress: None,
scan_range: None,
},
};
let db = crate::instance::make_rustfsms(Arc::new(default_input), true).await?;
Ok(Arc::new(db) as Arc<dyn DatabaseManagerSystem + Send + Sync>)
}

View File

@@ -0,0 +1,247 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod error_handling_tests {
use crate::get_global_db;
use rustfs_s3select_api::{
QueryError,
query::{Context, Query},
};
use s3s::dto::{
CSVInput, ExpressionType, FileHeaderInfo, InputSerialization, SelectObjectContentInput, SelectObjectContentRequest,
};
use std::sync::Arc;
fn create_test_input_with_sql(sql: &str) -> SelectObjectContentInput {
SelectObjectContentInput {
bucket: "test-bucket".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
request: SelectObjectContentRequest {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput {
file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::USE)),
..Default::default()
}),
..Default::default()
},
output_serialization: s3s::dto::OutputSerialization::default(),
request_progress: None,
scan_range: None,
},
}
}
#[tokio::test]
async fn test_syntax_error_handling() {
let invalid_sqls = vec![
"INVALID SQL",
"SELECT FROM",
"SELECT * FORM S3Object", // typo in FROM
"SELECT * FROM",
"SELECT * FROM S3Object WHERE",
"SELECT COUNT( FROM S3Object", // missing closing parenthesis
];
for sql in invalid_sqls {
let input = create_test_input_with_sql(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_err(), "Expected error for SQL: {sql}");
}
}
#[tokio::test]
async fn test_multi_statement_error() {
let multi_statement_sqls = vec![
"SELECT * FROM S3Object; SELECT 1;",
"SELECT 1; SELECT 2; SELECT 3;",
"SELECT * FROM S3Object; DROP TABLE test;",
];
for sql in multi_statement_sqls {
let input = create_test_input_with_sql(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_err(), "Expected multi-statement error for SQL: {sql}");
if let Err(QueryError::MultiStatement { num, .. }) = result {
assert!(num >= 2, "Expected at least 2 statements, got: {num}");
}
}
}
#[tokio::test]
async fn test_unsupported_operations() {
let unsupported_sqls = vec![
"INSERT INTO S3Object VALUES (1, 'test')",
"UPDATE S3Object SET name = 'test'",
"DELETE FROM S3Object",
"CREATE TABLE test (id INT)",
"DROP TABLE S3Object",
];
for sql in unsupported_sqls {
let input = create_test_input_with_sql(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
// These should either fail with syntax error or not implemented error
assert!(result.is_err(), "Expected error for unsupported SQL: {sql}");
}
}
#[tokio::test]
async fn test_invalid_column_references() {
let invalid_column_sqls = vec![
"SELECT nonexistent_column FROM S3Object",
"SELECT * FROM S3Object WHERE nonexistent_column = 1",
"SELECT * FROM S3Object ORDER BY nonexistent_column",
"SELECT * FROM S3Object GROUP BY nonexistent_column",
];
for sql in invalid_column_sqls {
let input = create_test_input_with_sql(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
// These might succeed or fail depending on schema inference
// The test verifies that the system handles them gracefully
match result {
Ok(_) => {
// If it succeeds, verify we can get results
let handle = result.unwrap();
let output = handle.result().chunk_result().await;
// Should either succeed with empty results or fail gracefully
let _ = output;
}
Err(_) => {
// Expected to fail - this is acceptable
}
}
}
}
#[tokio::test]
async fn test_complex_query_error_recovery() {
let complex_invalid_sql = r#"
SELECT
name,
age,
INVALID_FUNCTION(salary) as invalid_calc,
department
FROM S3Object
WHERE age > 'invalid_number'
GROUP BY department, nonexistent_column
HAVING COUNT(*) > INVALID_FUNCTION()
ORDER BY invalid_column
"#;
let input = create_test_input_with_sql(complex_invalid_sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, complex_invalid_sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_err(), "Expected error for complex invalid SQL");
}
#[tokio::test]
async fn test_empty_query() {
let empty_sqls = vec!["", " ", "\n\t \n"];
for sql in empty_sqls {
let input = create_test_input_with_sql(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
// Empty queries might be handled differently by the parser
match result {
Ok(_) => {
// Some parsers might accept empty queries
}
Err(_) => {
// Expected to fail for empty SQL
}
}
}
}
#[tokio::test]
async fn test_very_long_query() {
// Create a very long but valid query
let mut long_sql = "SELECT ".to_string();
for i in 0..1000 {
if i > 0 {
long_sql.push_str(", ");
}
long_sql.push_str(&format!("'column_{i}' as col_{i}"));
}
long_sql.push_str(" FROM S3Object LIMIT 1");
let input = create_test_input_with_sql(&long_sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, long_sql);
let result = db.execute(&query).await;
// This should either succeed or fail gracefully
match result {
Ok(handle) => {
let output = handle.result().chunk_result().await;
assert!(output.is_ok(), "Query execution should complete successfully");
}
Err(_) => {
// Acceptable to fail due to resource constraints
}
}
}
#[tokio::test]
async fn test_sql_injection_patterns() {
let injection_patterns = vec![
"SELECT * FROM S3Object WHERE name = 'test'; DROP TABLE users; --",
"SELECT * FROM S3Object UNION SELECT * FROM information_schema.tables",
"SELECT * FROM S3Object WHERE 1=1 OR 1=1",
];
for sql in injection_patterns {
let input = create_test_input_with_sql(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
// These should be handled safely - either succeed with limited scope or fail
match result {
Ok(_) => {
// If successful, it should only access S3Object data
}
Err(_) => {
// Expected to fail for security reasons
}
}
}
}
}

View File

@@ -0,0 +1,228 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod integration_tests {
use crate::{create_fresh_db, get_global_db, instance::make_rustfsms};
use rustfs_s3select_api::{
QueryError,
query::{Context, Query},
};
use s3s::dto::{
CSVInput, CSVOutput, ExpressionType, FileHeaderInfo, InputSerialization, OutputSerialization, SelectObjectContentInput,
SelectObjectContentRequest,
};
use std::sync::Arc;
fn create_test_input(sql: &str) -> SelectObjectContentInput {
SelectObjectContentInput {
bucket: "test-bucket".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
request: SelectObjectContentRequest {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput {
file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::USE)),
..Default::default()
}),
..Default::default()
},
output_serialization: OutputSerialization {
csv: Some(CSVOutput::default()),
..Default::default()
},
request_progress: None,
scan_range: None,
},
}
}
#[tokio::test]
async fn test_database_creation() {
let input = create_test_input("SELECT * FROM S3Object");
let result = make_rustfsms(Arc::new(input), true).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_global_db_creation() {
let input = create_test_input("SELECT * FROM S3Object");
let result = get_global_db(input.clone(), true).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_fresh_db_creation() {
let result = create_fresh_db().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_simple_select_query() {
let sql = "SELECT * FROM S3Object";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_ok());
let query_handle = result.unwrap();
let output = query_handle.result().chunk_result().await;
assert!(output.is_ok());
}
#[tokio::test]
async fn test_select_with_where_clause() {
let sql = "SELECT name, age FROM S3Object WHERE age > 30";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_select_with_aggregation() {
let sql = "SELECT department, COUNT(*) as count FROM S3Object GROUP BY department";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
// Aggregation queries might fail due to lack of actual data, which is acceptable
match result {
Ok(_) => {
// If successful, that's great
}
Err(_) => {
// Expected to fail due to no actual data source
}
}
}
#[tokio::test]
async fn test_invalid_sql_syntax() {
let sql = "INVALID SQL SYNTAX";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_multi_statement_error() {
let sql = "SELECT * FROM S3Object; SELECT 1;";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_err());
if let Err(QueryError::MultiStatement { num, .. }) = result {
assert_eq!(num, 2);
} else {
panic!("Expected MultiStatement error");
}
}
#[tokio::test]
async fn test_query_state_machine_workflow() {
let sql = "SELECT * FROM S3Object";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
// Test state machine creation
let state_machine = db.build_query_state_machine(query.clone()).await;
assert!(state_machine.is_ok());
let state_machine = state_machine.unwrap();
// Test logical plan building
let logical_plan = db.build_logical_plan(state_machine.clone()).await;
assert!(logical_plan.is_ok());
// Test execution if plan exists
if let Ok(Some(plan)) = logical_plan {
let execution_result = db.execute_logical_plan(plan, state_machine).await;
assert!(execution_result.is_ok());
}
}
#[tokio::test]
async fn test_query_with_limit() {
let sql = "SELECT * FROM S3Object LIMIT 5";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_ok());
let query_handle = result.unwrap();
let output = query_handle.result().chunk_result().await.unwrap();
// Verify that we get results (exact count depends on test data)
let total_rows: usize = output.iter().map(|batch| batch.num_rows()).sum();
assert!(total_rows <= 5);
}
#[tokio::test]
async fn test_query_with_order_by() {
let sql = "SELECT name, age FROM S3Object ORDER BY age DESC";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
let query = Query::new(Context { input: Arc::new(input) }, sql.to_string());
let result = db.execute(&query).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_concurrent_queries() {
let sql = "SELECT * FROM S3Object";
let input = create_test_input(sql);
let db = get_global_db(input.clone(), true).await.unwrap();
// Execute multiple queries concurrently
let mut handles = vec![];
for i in 0..3 {
let query = Query::new(
Context {
input: Arc::new(input.clone()),
},
format!("SELECT * FROM S3Object LIMIT {}", i + 1),
);
let db_clone = db.clone();
let handle = tokio::spawn(async move { db_clone.execute(&query).await });
handles.push(handle);
}
// Wait for all queries to complete
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok());
}
}
}

View File

@@ -0,0 +1,18 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Test modules for s3select-query
pub mod error_handling_test;
pub mod integration_test;