From b1d8e6080218885f095287b37aef25f520d75f34 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Mon, 21 Apr 2025 01:42:57 +0000 Subject: [PATCH] fix sql Signed-off-by: junxiang Mu <1948535941@qq.com> --- Cargo.lock | 1 + rustfs/src/server/service_state.rs | 8 +- rustfs/src/storage/ecfs.rs | 7 +- s3select/api/src/query/session.rs | 16 ++- s3select/query/Cargo.toml | 1 + s3select/query/src/dispatcher/manager.rs | 119 +++++++++++++++++++---- s3select/query/src/instance.rs | 18 +++- 7 files changed, 139 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f69232ec..47a013b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6560,6 +6560,7 @@ dependencies = [ "datafusion", "derive_builder", "futures", + "lazy_static", "parking_lot 0.12.3", "s3s", "snafu", diff --git a/rustfs/src/server/service_state.rs b/rustfs/src/server/service_state.rs index d27296e3..ad137f66 100644 --- a/rustfs/src/server/service_state.rs +++ b/rustfs/src/server/service_state.rs @@ -96,7 +96,9 @@ impl ServiceStateManager { ServiceState::Starting => { info!("Service is starting..."); #[cfg(target_os = "linux")] - if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...".to_string())]) { + if let Err(e) = + libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...".to_string())]) + { tracing::error!("Failed to notify systemd of starting state: {}", e); } } @@ -111,7 +113,9 @@ impl ServiceStateManager { ServiceState::Stopped => { info!("Service has stopped"); #[cfg(target_os = "linux")] - if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped".to_string())]) { + if let Err(e) = + libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped".to_string())]) + { tracing::error!("Failed to notify systemd of stopped state: {}", e); } } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index f1f53b9f..252cd316 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1895,10 +1895,13 @@ impl S3 for FS { let db = make_rustfsms(input.clone(), false).await.map_err(|e| { error!("make db failed, {}", e.to_string()); - s3_error!(InternalError) + s3_error!(InternalError, "{}", e.to_string()) })?; let query = Query::new(Context { input: input.clone() }, input.request.expression); - let result = db.execute(&query).await.map_err(|_| s3_error!(InternalError))?; + let result = db + .execute(&query) + .await + .map_err(|e| s3_error!(InternalError, "{}", e.to_string()))?; let results = result.result().chunk_result().await.unwrap().to_vec(); diff --git a/s3select/api/src/query/session.rs b/s3select/api/src/query/session.rs index c9d91f51..286ee9f8 100644 --- a/s3select/api/src/query/session.rs +++ b/s3select/api/src/query/session.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use bytes::Bytes; use datafusion::{ execution::{context::SessionState, runtime_env::RuntimeEnvBuilder, SessionStateBuilder}, + parquet::data_type::AsBytes, prelude::SessionContext, }; use object_store::{memory::InMemory, path::Path, ObjectStore}; @@ -65,7 +65,19 @@ impl SessionCtxFactory { 8,Henry,32,IT,6200 9,Ivy,24,Marketing,4800 10,Jack,38,Finance,7500"; - let data_bytes = Bytes::from(data.to_vec()); + let data_bytes = data.as_bytes(); + // let data = r#""year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"SOPHIA"╦"119"╦"1" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"CHLOE"╦"106"╦"2" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMILY"╦"93"╦"3" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"OLIVIA"╦"89"╦"4" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMMA"╦"75"╦"5" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ISABELLA"╦"67"╦"6" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"TIFFANY"╦"54"╦"7" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ASHLEY"╦"52"╦"8" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"FIONA"╦"48"╦"9" + // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ANGELA"╦"47"╦"10""#; + // let data_bytes = Bytes::from(data); let path = Path::from(context.input.key.clone()); store.put(&path, data_bytes.into()).await.map_err(|e| { error!("put data into memory failed: {}", e.to_string()); diff --git a/s3select/query/Cargo.toml b/s3select/query/Cargo.toml index 8d9b159c..559eab27 100644 --- a/s3select/query/Cargo.toml +++ b/s3select/query/Cargo.toml @@ -10,6 +10,7 @@ async-trait.workspace = true datafusion = { workspace = true } derive_builder = { workspace = true } futures = { workspace = true } +lazy_static = { workspace = true } parking_lot = { version = "0.12.3" } s3s.workspace = true snafu = { workspace = true, features = ["backtrace"] } diff --git a/s3select/query/src/dispatcher/manager.rs b/s3select/query/src/dispatcher/manager.rs index e85e7a54..05f76343 100644 --- a/s3select/query/src/dispatcher/manager.rs +++ b/s3select/query/src/dispatcher/manager.rs @@ -1,4 +1,5 @@ use std::{ + ops::Deref, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -19,8 +20,10 @@ use api::{ }; use async_trait::async_trait; use datafusion::{ - arrow::{datatypes::SchemaRef, record_batch::RecordBatch}, - config::CsvOptions, + arrow::{ + datatypes::{Schema, SchemaRef}, + record_batch::RecordBatch, + }, datasource::{ file_format::{csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, @@ -29,7 +32,8 @@ use datafusion::{ execution::{RecordBatchStream, SendableRecordBatchStream}, }; use futures::{Stream, StreamExt}; -use s3s::dto::SelectObjectContentInput; +use lazy_static::lazy_static; +use s3s::dto::{FileHeaderInfo, SelectObjectContentInput}; use crate::{ execution::factory::QueryExecutionFactoryRef, @@ -37,6 +41,12 @@ use crate::{ sql::logical::planner::DefaultLogicalPlanner, }; +lazy_static! { + static ref IGNORE: FileHeaderInfo = FileHeaderInfo::from_static(FileHeaderInfo::IGNORE); + static ref NONE: FileHeaderInfo = FileHeaderInfo::from_static(FileHeaderInfo::NONE); + static ref USE: FileHeaderInfo = FileHeaderInfo::from_static(FileHeaderInfo::USE); +} + #[derive(Clone)] pub struct SimpleQueryDispatcher { input: SelectObjectContentInput, @@ -138,25 +148,94 @@ impl SimpleQueryDispatcher { async fn build_scheme_provider(&self, session: &SessionCtx) -> QueryResult { let path = format!("s3://{}/{}", self.input.bucket, self.input.key); let table_path = ListingTableUrl::parse(path)?; - let listing_options = if self.input.request.input_serialization.csv.is_some() { - let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(true)); - ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv") - } else if self.input.request.input_serialization.parquet.is_some() { - let file_format = ParquetFormat::new(); - ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet") - } else if self.input.request.input_serialization.json.is_some() { - let file_format = JsonFormat::default(); - ListingOptions::new(Arc::new(file_format)).with_file_extension(".json") - } else { - return Err(QueryError::NotImplemented { - err: "not support this file type".to_string(), - }); - }; + let (listing_options, need_rename_volume_name, need_ignore_volume_name) = + if let Some(csv) = self.input.request.input_serialization.csv.as_ref() { + let mut need_rename_volume_name = false; + let mut need_ignore_volume_name = false; + let mut file_format = CsvFormat::default() + .with_comment( + csv.comments + .clone() + .map(|c| c.as_bytes().first().copied().unwrap_or_default()), + ) + .with_escape( + csv.quote_escape_character + .clone() + .map(|e| e.as_bytes().first().copied().unwrap_or_default()), + ); + if let Some(delimiter) = csv.field_delimiter.as_ref() { + file_format = file_format.with_delimiter(delimiter.as_bytes().first().copied().unwrap_or_default()); + } + if csv.file_header_info.is_some() {} + match csv.file_header_info.as_ref() { + Some(info) => { + if *info == *NONE { + file_format = file_format.with_has_header(false); + need_rename_volume_name = true; + } else if *info == *IGNORE { + file_format = file_format.with_has_header(true); + need_rename_volume_name = true; + need_ignore_volume_name = true; + } else if *info == *USE { + file_format = file_format.with_has_header(true); + } else { + return Err(QueryError::NotImplemented { + err: "unsupported FileHeaderInfo".to_string(), + }); + } + } + _ => { + return Err(QueryError::NotImplemented { + err: "unsupported FileHeaderInfo".to_string(), + }); + } + } + if let Some(quote) = csv.quote_character.as_ref() { + file_format = file_format.with_quote(quote.as_bytes().first().copied().unwrap_or_default()); + } + ( + ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv"), + need_rename_volume_name, + need_ignore_volume_name, + ) + } else if self.input.request.input_serialization.parquet.is_some() { + let file_format = ParquetFormat::new(); + (ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet"), false, false) + } else if self.input.request.input_serialization.json.is_some() { + let file_format = JsonFormat::default(); + (ListingOptions::new(Arc::new(file_format)).with_file_extension(".json"), false, false) + } else { + return Err(QueryError::NotImplemented { + err: "not support this file type".to_string(), + }); + }; let resolve_schema = listing_options.infer_schema(session.inner(), &table_path).await?; - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(resolve_schema); + let config = if need_rename_volume_name { + let mut new_fields = Vec::new(); + for (i, field) in resolve_schema.fields().iter().enumerate() { + let f_name = field.name(); + let mut_field = field.deref().clone(); + if f_name.starts_with("column_") { + let re_name = f_name.replace("column_", "_"); + new_fields.push(mut_field.with_name(re_name)); + } else if need_ignore_volume_name { + let re_name = format!("_{}", i + 1); + new_fields.push(mut_field.with_name(re_name)); + } else { + new_fields.push(mut_field); + } + } + let new_schema = Arc::new(Schema::new(new_fields).with_metadata(resolve_schema.metadata().clone())); + ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(new_schema) + } else { + ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(resolve_schema) + }; + // rename default let provider = Arc::new(ListingTable::try_new(config)?); let current_session_table_provider = self.build_table_handle_provider()?; let metadata_provider = diff --git a/s3select/query/src/instance.rs b/s3select/query/src/instance.rs index 3ad8941a..44952a4a 100644 --- a/s3select/query/src/instance.rs +++ b/s3select/query/src/instance.rs @@ -101,8 +101,8 @@ mod tests { }; use datafusion::{arrow::util::pretty, assert_batches_eq}; use s3s::dto::{ - CSVInput, CSVOutput, ExpressionType, InputSerialization, OutputSerialization, SelectObjectContentInput, - SelectObjectContentRequest, + CSVInput, CSVOutput, ExpressionType, FieldDelimiter, FileHeaderInfo, InputSerialization, OutputSerialization, + RecordDelimiter, SelectObjectContentInput, SelectObjectContentRequest, }; use crate::instance::make_rustfsms; @@ -122,7 +122,10 @@ mod tests { expression: sql.to_string(), expression_type: ExpressionType::from_static("SQL"), input_serialization: InputSerialization { - csv: Some(CSVInput::default()), + csv: Some(CSVInput { + file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::USE)), + ..Default::default() + }), ..Default::default() }, output_serialization: OutputSerialization { @@ -164,7 +167,7 @@ mod tests { #[tokio::test] #[ignore] async fn test_func_sql() { - let sql = "select count(s.id) from S3Object as s"; + let sql = "SELECT s._1 FROM S3Object s"; let input = SelectObjectContentInput { bucket: "dandan".to_string(), expected_bucket_owner: None, @@ -176,7 +179,12 @@ mod tests { expression: sql.to_string(), expression_type: ExpressionType::from_static("SQL"), input_serialization: InputSerialization { - csv: Some(CSVInput::default()), + csv: Some(CSVInput { + file_header_info: Some(FileHeaderInfo::from_static(FileHeaderInfo::IGNORE)), + field_delimiter: Some(FieldDelimiter::from("╦")), + record_delimiter: Some(RecordDelimiter::from("\n")), + ..Default::default() + }), ..Default::default() }, output_serialization: OutputSerialization {