Merge pull request #342 from rustfs/fix-sql

fix sql
This commit is contained in:
guojidan
2025-04-21 09:47:02 +08:00
committed by GitHub
7 changed files with 139 additions and 31 deletions

1
Cargo.lock generated
View File

@@ -6560,6 +6560,7 @@ dependencies = [
"datafusion",
"derive_builder",
"futures",
"lazy_static",
"parking_lot 0.12.3",
"s3s",
"snafu",

View File

@@ -96,7 +96,9 @@ impl ServiceStateManager {
ServiceState::Starting => {
info!("Service is starting...");
#[cfg(target_os = "linux")]
if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...".to_string())]) {
if let Err(e) =
libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...".to_string())])
{
tracing::error!("Failed to notify systemd of starting state: {}", e);
}
}
@@ -111,7 +113,9 @@ impl ServiceStateManager {
ServiceState::Stopped => {
info!("Service has stopped");
#[cfg(target_os = "linux")]
if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped".to_string())]) {
if let Err(e) =
libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped".to_string())])
{
tracing::error!("Failed to notify systemd of stopped state: {}", e);
}
}

View File

@@ -1895,10 +1895,13 @@ impl S3 for FS {
let db = make_rustfsms(input.clone(), false).await.map_err(|e| {
error!("make db failed, {}", e.to_string());
s3_error!(InternalError)
s3_error!(InternalError, "{}", e.to_string())
})?;
let query = Query::new(Context { input: input.clone() }, input.request.expression);
let result = db.execute(&query).await.map_err(|_| s3_error!(InternalError))?;
let result = db
.execute(&query)
.await
.map_err(|e| s3_error!(InternalError, "{}", e.to_string()))?;
let results = result.result().chunk_result().await.unwrap().to_vec();

View File

@@ -1,8 +1,8 @@
use std::sync::Arc;
use bytes::Bytes;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
parquet::data_type::AsBytes,
prelude::SessionContext,
};
use object_store::{memory::InMemory, path::Path, ObjectStore};
@@ -65,7 +65,19 @@ impl SessionCtxFactory {
8,Henry,32,IT,6200
9,Ivy,24,Marketing,4800
10,Jack,38,Finance,7500";
let data_bytes = Bytes::from(data.to_vec());
let data_bytes = data.as_bytes();
// let data = r#""year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"SOPHIA"╦"119"╦"1"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"CHLOE"╦"106"╦"2"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMILY"╦"93"╦"3"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"OLIVIA"╦"89"╦"4"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMMA"╦"75"╦"5"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ISABELLA"╦"67"╦"6"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"TIFFANY"╦"54"╦"7"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ASHLEY"╦"52"╦"8"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"FIONA"╦"48"╦"9"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ANGELA"╦"47"╦"10""#;
// let data_bytes = Bytes::from(data);
let path = Path::from(context.input.key.clone());
store.put(&path, data_bytes.into()).await.map_err(|e| {
error!("put data into memory failed: {}", e.to_string());

View File

@@ -10,6 +10,7 @@ async-trait.workspace = true
datafusion = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
parking_lot = { version = "0.12.3" }
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }

View File

@@ -1,4 +1,5 @@
use std::{
ops::Deref,
pin::Pin,
sync::Arc,
task::{Context, Poll},
@@ -19,8 +20,10 @@ use api::{
};
use async_trait::async_trait;
use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
config::CsvOptions,
arrow::{
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
},
datasource::{
file_format::{csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
@@ -29,7 +32,8 @@ use datafusion::{
execution::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::{Stream, StreamExt};
use s3s::dto::SelectObjectContentInput;
use lazy_static::lazy_static;
use s3s::dto::{FileHeaderInfo, SelectObjectContentInput};
use crate::{
execution::factory::QueryExecutionFactoryRef,
@@ -37,6 +41,12 @@ use crate::{
sql::logical::planner::DefaultLogicalPlanner,
};
lazy_static! {
static ref IGNORE: FileHeaderInfo = FileHeaderInfo::from_static(FileHeaderInfo::IGNORE);
static ref NONE: FileHeaderInfo = FileHeaderInfo::from_static(FileHeaderInfo::NONE);
static ref USE: FileHeaderInfo = FileHeaderInfo::from_static(FileHeaderInfo::USE);
}
#[derive(Clone)]
pub struct SimpleQueryDispatcher {
input: SelectObjectContentInput,
@@ -138,25 +148,94 @@ impl SimpleQueryDispatcher {
async fn build_scheme_provider(&self, session: &SessionCtx) -> QueryResult<MetadataProvider> {
let path = format!("s3://{}/{}", self.input.bucket, self.input.key);
let table_path = ListingTableUrl::parse(path)?;
let listing_options = if self.input.request.input_serialization.csv.is_some() {
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(true));
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
} else if self.input.request.input_serialization.parquet.is_some() {
let file_format = ParquetFormat::new();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
} else if self.input.request.input_serialization.json.is_some() {
let file_format = JsonFormat::default();
ListingOptions::new(Arc::new(file_format)).with_file_extension(".json")
} else {
return Err(QueryError::NotImplemented {
err: "not support this file type".to_string(),
});
};
let (listing_options, need_rename_volume_name, need_ignore_volume_name) =
if let Some(csv) = self.input.request.input_serialization.csv.as_ref() {
let mut need_rename_volume_name = false;
let mut need_ignore_volume_name = false;
let mut file_format = CsvFormat::default()
.with_comment(
csv.comments
.clone()
.map(|c| c.as_bytes().first().copied().unwrap_or_default()),
)
.with_escape(
csv.quote_escape_character
.clone()
.map(|e| e.as_bytes().first().copied().unwrap_or_default()),
);
if let Some(delimiter) = csv.field_delimiter.as_ref() {
file_format = file_format.with_delimiter(delimiter.as_bytes().first().copied().unwrap_or_default());
}
if csv.file_header_info.is_some() {}
match csv.file_header_info.as_ref() {
Some(info) => {
if *info == *NONE {
file_format = file_format.with_has_header(false);
need_rename_volume_name = true;
} else if *info == *IGNORE {
file_format = file_format.with_has_header(true);
need_rename_volume_name = true;
need_ignore_volume_name = true;
} else if *info == *USE {
file_format = file_format.with_has_header(true);
} else {
return Err(QueryError::NotImplemented {
err: "unsupported FileHeaderInfo".to_string(),
});
}
}
_ => {
return Err(QueryError::NotImplemented {
err: "unsupported FileHeaderInfo".to_string(),
});
}
}
if let Some(quote) = csv.quote_character.as_ref() {
file_format = file_format.with_quote(quote.as_bytes().first().copied().unwrap_or_default());
}
(
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv"),
need_rename_volume_name,
need_ignore_volume_name,
)
} else if self.input.request.input_serialization.parquet.is_some() {
let file_format = ParquetFormat::new();
(ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet"), false, false)
} else if self.input.request.input_serialization.json.is_some() {
let file_format = JsonFormat::default();
(ListingOptions::new(Arc::new(file_format)).with_file_extension(".json"), false, false)
} else {
return Err(QueryError::NotImplemented {
err: "not support this file type".to_string(),
});
};
let resolve_schema = listing_options.infer_schema(session.inner(), &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolve_schema);
let config = if need_rename_volume_name {
let mut new_fields = Vec::new();
for (i, field) in resolve_schema.fields().iter().enumerate() {
let f_name = field.name();
let mut_field = field.deref().clone();
if f_name.starts_with("column_") {
let re_name = f_name.replace("column_", "_");
new_fields.push(mut_field.with_name(re_name));
} else if need_ignore_volume_name {
let re_name = format!("_{}", i + 1);
new_fields.push(mut_field.with_name(re_name));
} else {
new_fields.push(mut_field);
}
}
let new_schema = Arc::new(Schema::new(new_fields).with_metadata(resolve_schema.metadata().clone()));
ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(new_schema)
} else {
ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(resolve_schema)
};
// rename default
let provider = Arc::new(ListingTable::try_new(config)?);
let current_session_table_provider = self.build_table_handle_provider()?;
let metadata_provider =

View File

@@ -101,8 +101,8 @@ mod tests {
};
use datafusion::{arrow::util::pretty, assert_batches_eq};
use s3s::dto::{
CSVInput, CSVOutput, ExpressionType, InputSerialization, OutputSerialization, SelectObjectContentInput,
SelectObjectContentRequest,
CSVInput, CSVOutput, ExpressionType, FieldDelimiter, FileHeaderInfo, InputSerialization, OutputSerialization,
RecordDelimiter, SelectObjectContentInput, SelectObjectContentRequest,
};
use crate::instance::make_rustfsms;
@@ -122,7 +122,10 @@ mod tests {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput::default()),
csv: Some(CSVInput {
file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::USE)),
..Default::default()
}),
..Default::default()
},
output_serialization: OutputSerialization {
@@ -164,7 +167,7 @@ mod tests {
#[tokio::test]
#[ignore]
async fn test_func_sql() {
let sql = "select count(s.id) from S3Object as s";
let sql = "SELECT s._1 FROM S3Object s";
let input = SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
@@ -176,7 +179,12 @@ mod tests {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput::default()),
csv: Some(CSVInput {
file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::IGNORE)),
field_delimiter: Some(FieldDelimiter::from("")),
record_delimiter: Some(RecordDelimiter::from("\n")),
..Default::default()
}),
..Default::default()
},
output_serialization: OutputSerialization {