diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 591a19aab49b..fed63ec12b49 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -17,6 +17,7 @@ //! Factory for creating ListingTables with default options +use std::collections::HashSet; use std::path::Path; use std::sync::Arc; @@ -27,7 +28,7 @@ use crate::datasource::listing::{ use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common::{arrow_datafusion_err, DataFusionError}; +use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema}; use datafusion_common::{config_datafusion_err, Result}; use datafusion_expr::CreateExternalTable; @@ -113,19 +114,39 @@ impl TableProviderFactory for ListingTableFactory { .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) - .with_table_partition_cols(table_partition_cols) - .with_file_sort_order(cmd.order_exprs.clone()); + .with_table_partition_cols(table_partition_cols); options .validate_partitions(session_state, &table_path) .await?; let resolved_schema = match provided_schema { - None => options.infer_schema(session_state, &table_path).await?, + // We will need to check the table columns against the schema + // this is done so that we can do an ORDER BY for external table creation + // specifically for parquet file format. + // See: https://github.com/apache/datafusion/issues/7317 + None => { + let schema = options.infer_schema(session_state, &table_path).await?; + let df_schema = schema.clone().to_dfschema()?; + let column_refs: HashSet<_> = cmd + .order_exprs + .iter() + .flat_map(|sort| sort.iter()) + .flat_map(|s| s.expr.column_refs()) + .collect(); + + for column in &column_refs { + if !df_schema.has_column(column) { + return plan_err!("Column {column} is not in schema"); + } + } + + schema + } Some(s) => s, }; let config = ListingTableConfig::new(table_path) - .with_listing_options(options) + .with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone())) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)? .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache()); diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index d9719e08052f..29dfe25993f1 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1136,11 +1136,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result>> { - // Ask user to provide a schema if schema is empty. if !order_exprs.is_empty() && schema.fields().is_empty() { - return plan_err!( - "Provide a schema before specifying the order while creating a table." - ); + let results = order_exprs + .iter() + .map(|lex_order| { + let result = lex_order + .iter() + .map(|order_by_expr| { + let ordered_expr = &order_by_expr.expr; + let ordered_expr = ordered_expr.to_owned(); + let ordered_expr = self + .sql_expr_to_logical_expr( + ordered_expr, + schema, + planner_context, + ) + .unwrap(); + let asc = order_by_expr.asc.unwrap_or(true); + let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc); + + SortExpr::new(ordered_expr, asc, nulls_first) + }) + .collect::>(); + result + }) + .collect::>>(); + + return Ok(results); } let mut all_results = vec![]; diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index bdb84af464f2..5c9655a55606 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2002,6 +2002,13 @@ fn create_external_table_parquet_no_schema() { quick_test(sql, expected); } +#[test] +fn create_external_table_parquet_no_schema_sort_order() { + let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet' WITH ORDER (id)"; + let expected = "CreateExternalTable: Bare { table: \"t\" }"; + quick_test(sql, expected); +} + #[test] fn equijoin_explicit_syntax() { let sql = "SELECT id, order_id \ diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 3e2412cf021d..12b097c3d5d1 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -228,3 +228,50 @@ OPTIONS ( format.delimiter '|', has_header false, compression gzip); + +# Create an external parquet table and infer schema to order by + +# query should succeed +statement ok +CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id); + +## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 ASC] +query TT +EXPLAIN SELECT id FROM t ORDER BY id ASC; +---- +logical_plan +01)Sort: t.id ASC NULLS LAST +02)--TableScan: t projection=[id] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST] + +## Test a DESC order and verify that output_ordering is ASC from the previous OBRDER BY +query TT +EXPLAIN SELECT id FROM t ORDER BY id DESC; +---- +logical_plan +01)Sort: t.id DESC NULLS FIRST +02)--TableScan: t projection=[id] +physical_plan +01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST] + +statement ok +DROP TABLE t; + +# Create table with non default sort order +statement ok +CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id DESC NULLS FIRST); + +## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 DESC NULLS FIRST] +query TT +EXPLAIN SELECT id FROM t; +---- +logical_plan TableScan: t projection=[id] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 DESC] + +statement ok +DROP TABLE t; + +# query should fail with bad column +statement error DataFusion error: Error during planning: Column foo is not in schema +CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo); diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 7bb872e5a48f..f53363b6eb38 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -653,13 +653,6 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te query error DataFusion error: Error during planning: Column a is not in schema CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table'; - -# Create external table with DDL ordered columns without schema -# When schema is missing the query is expected to fail -query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\. -CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table'; - - # Sort with duplicate sort expressions # Table is sorted multiple times on the same column name and should not fail statement ok