add test case

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-03-14 04:35:36 +00:00
parent 4d423b6510
commit b145dfd437
8 changed files with 1003 additions and 3173 deletions

View File

@@ -1,10 +1,9 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -12,112 +11,114 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::mem;
use core::cmp::Ordering;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -1880,7 +1880,7 @@ impl S3 for FS {
let input = req.input;
info!("{:?}", input);
let db = make_cnosdbms(input.clone()).await.map_err(|e| {
let db = make_cnosdbms(input.clone(), false).await.map_err(|e| {
error!("make db failed, {}", e.to_string());
s3_error!(InternalError)
})?;

View File

@@ -38,6 +38,9 @@ pub enum QueryError {
#[snafu(display("Udf already exists, name:{}.", name))]
FunctionExists { name: String },
#[snafu(display("Store Error, e:{}.", e))]
StoreError { e: String },
}
impl From<DataFusionError> for QueryError {

View File

@@ -1,9 +1,12 @@
use std::sync::Arc;
use bytes::Bytes;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
prelude::SessionContext,
};
use object_store::{memory::InMemory, path::Path, ObjectStore};
use tracing::error;
use crate::{object_store::EcObjectStore, QueryError, QueryResult};
@@ -27,11 +30,13 @@ pub struct SessionCtxDesc {
}
#[derive(Default)]
pub struct SessionCtxFactory {}
pub struct SessionCtxFactory {
pub is_test: bool,
}
impl SessionCtxFactory {
pub fn create_session_ctx(&self, context: &Context) -> QueryResult<SessionCtx> {
let df_session_ctx = self.build_df_session_context(context)?;
pub async fn create_session_ctx(&self, context: &Context) -> QueryResult<SessionCtx> {
let df_session_ctx = self.build_df_session_context(context).await?;
Ok(SessionCtx {
_desc: Arc::new(SessionCtxDesc {}),
@@ -39,17 +44,41 @@ impl SessionCtxFactory {
})
}
fn build_df_session_context(&self, context: &Context) -> QueryResult<SessionContext> {
async fn build_df_session_context(&self, context: &Context) -> QueryResult<SessionContext> {
let path = format!("s3://{}", context.input.bucket);
let store_url = url::Url::parse(&path).unwrap();
let store = EcObjectStore::new(context.input.clone()).map_err(|_| QueryError::NotImplemented { err: String::new() })?;
let rt = RuntimeEnvBuilder::new().build()?;
let df_session_state = SessionStateBuilder::new()
.with_runtime_env(Arc::new(rt))
.with_object_store(&store_url, Arc::new(store))
.with_default_features()
.build();
.with_default_features();
let df_session_state = if self.is_test {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let data = b"id,name,age,department,salary
1,Alice,25,HR,5000
2,Bob,30,IT,6000
3,Charlie,35,Finance,7000
4,Diana,22,Marketing,4500
5,Eve,28,IT,5500
6,Frank,40,Finance,8000
7,Grace,26,HR,5200
8,Henry,32,IT,6200
9,Ivy,24,Marketing,4800
10,Jack,38,Finance,7500";
let data_bytes = Bytes::from(data.to_vec());
let path = Path::from(context.input.key.clone());
store.put(&path, data_bytes.into()).await.map_err(|e| {
error!("put data into memory failed: {}", e.to_string());
QueryError::StoreError { e: e.to_string() }
})?;
df_session_state.with_object_store(&store_url, Arc::new(store)).build()
} else {
let store =
EcObjectStore::new(context.input.clone()).map_err(|_| QueryError::NotImplemented { err: String::new() })?;
df_session_state.with_object_store(&store_url, Arc::new(store)).build()
};
let df_session_ctx = SessionContext::new_with_state(df_session_state);
Ok(df_session_ctx)

View File

@@ -98,7 +98,7 @@ impl QueryDispatcher for SimpleQueryDispatcher {
}
async fn build_query_state_machine(&self, query: Query) -> QueryResult<Arc<QueryStateMachine>> {
let session = self.session_factory.create_session_ctx(query.context())?;
let session = self.session_factory.create_session_ctx(query.context()).await?;
let query_state_machine = Arc::new(QueryStateMachine::begin(query, session));
Ok(query_state_machine)

View File

@@ -63,11 +63,11 @@ where
}
}
pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult<impl DatabaseManagerSystem> {
pub async fn make_cnosdbms(input: SelectObjectContentInput, is_test: bool) -> QueryResult<impl DatabaseManagerSystem> {
// init Function Manager, we can define some UDF if need
let func_manager = SimpleFunctionMetadataManager::default();
// TODO session config need load global system config
let session_factory = Arc::new(SessionCtxFactory {});
let session_factory = Arc::new(SessionCtxFactory { is_test });
let parser = Arc::new(DefaultParser::default());
let optimizer = Arc::new(CascadeOptimizerBuilder::default().build());
// TODO wrap, and num_threads configurable
@@ -92,3 +92,73 @@ pub async fn make_cnosdbms(input: SelectObjectContentInput) -> QueryResult<impl
Ok(db_server)
}
#[cfg(test)]
mod tests {
use api::{
query::{Context, Query},
server::dbms::DatabaseManagerSystem,
};
use datafusion::{arrow::util::pretty, assert_batches_eq};
use s3s::dto::{
CSVInput, CSVOutput, ExpressionType, InputSerialization, OutputSerialization, SelectObjectContentInput,
SelectObjectContentRequest,
};
use crate::instance::make_cnosdbms;
#[tokio::test]
#[ignore]
async fn test_simple_sql() {
let sql = "select * from S3Object";
let input = SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
request: SelectObjectContentRequest {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput::default()),
..Default::default()
},
output_serialization: OutputSerialization {
csv: Some(CSVOutput::default()),
..Default::default()
},
request_progress: None,
scan_range: None,
},
};
let db = make_cnosdbms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());
let result = db.execute(&query).await.unwrap();
let results = result.result().chunk_result().await.unwrap().to_vec();
let expected = [
"+----------------+----------+----------+------------+----------+",
"| column_1 | column_2 | column_3 | column_4 | column_5 |",
"+----------------+----------+----------+------------+----------+",
"| id | name | age | department | salary |",
"| 1 | Alice | 25 | HR | 5000 |",
"| 2 | Bob | 30 | IT | 6000 |",
"| 3 | Charlie | 35 | Finance | 7000 |",
"| 4 | Diana | 22 | Marketing | 4500 |",
"| 5 | Eve | 28 | IT | 5500 |",
"| 6 | Frank | 40 | Finance | 8000 |",
"| 7 | Grace | 26 | HR | 5200 |",
"| 8 | Henry | 32 | IT | 6200 |",
"| 9 | Ivy | 24 | Marketing | 4800 |",
"| 10 | Jack | 38 | Finance | 7500 |",
"+----------------+----------+----------+------------+----------+",
];
assert_batches_eq!(expected, &results);
pretty::print_batches(&results).unwrap();
}
}

View File

@@ -36,10 +36,7 @@ impl Optimizer for CascadeOptimizer {
debug!("Original physical plan:\n{}\n", displayable(physical_plan.as_ref()).indent(false));
let optimized_physical_plan = {
self.physical_optimizer
.optimize(physical_plan, session)?
};
let optimized_physical_plan = { self.physical_optimizer.optimize(physical_plan, session)? };
Ok(optimized_physical_plan)
}