Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): Allowing setting sort order of parquet files without specifying the schema #12466

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Factory for creating ListingTables with default options

use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -27,7 +28,7 @@ use crate::datasource::listing::{
use crate::execution::context::SessionState;

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::{arrow_datafusion_err, DataFusionError};
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
use datafusion_common::{config_datafusion_err, Result};
use datafusion_expr::CreateExternalTable;

Expand Down Expand Up @@ -113,19 +114,39 @@ impl TableProviderFactory for ListingTableFactory {
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone());
.with_table_partition_cols(table_partition_cols);

options
.validate_partitions(session_state, &table_path)
.await?;

let resolved_schema = match provided_schema {
None => options.infer_schema(session_state, &table_path).await?,
// We will need to check the table columns against the schema
// this is done so that we can do an ORDER BY for external table creation
// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
let schema = options.infer_schema(session_state, &table_path).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Thank you @devanbenz -- perfect

let df_schema = schema.clone().to_dfschema()?;
let column_refs: HashSet<_> = cmd
.order_exprs
.iter()
.flat_map(|sort| sort.iter())
.flat_map(|s| s.expr.column_refs())
.collect();

for column in &column_refs {
if !df_schema.has_column(column) {
return plan_err!("Column {column} is not in schema");
}
}

schema
}
Some(s) => s,
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
Expand Down
30 changes: 26 additions & 4 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,11 +1136,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchemaRef,
planner_context: &mut PlannerContext,
) -> Result<Vec<Vec<SortExpr>>> {
// Ask user to provide a schema if schema is empty.
if !order_exprs.is_empty() && schema.fields().is_empty() {
return plan_err!(
"Provide a schema before specifying the order while creating a table."
);
let results = order_exprs
.iter()
.map(|lex_order| {
let result = lex_order
.iter()
.map(|order_by_expr| {
let ordered_expr = &order_by_expr.expr;
let ordered_expr = ordered_expr.to_owned();
let ordered_expr = self
.sql_expr_to_logical_expr(
ordered_expr,
schema,
planner_context,
)
.unwrap();
let asc = order_by_expr.asc.unwrap_or(true);
let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc);

SortExpr::new(ordered_expr, asc, nulls_first)
})
.collect::<Vec<SortExpr>>();
result
})
.collect::<Vec<Vec<SortExpr>>>();

return Ok(results);
}

let mut all_results = vec![];
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,13 @@ fn create_external_table_parquet_no_schema() {
quick_test(sql, expected);
}

#[test]
fn create_external_table_parquet_no_schema_sort_order() {
let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet' WITH ORDER (id)";
let expected = "CreateExternalTable: Bare { table: \"t\" }";
quick_test(sql, expected);
}

#[test]
fn equijoin_explicit_syntax() {
let sql = "SELECT id, order_id \
Expand Down
47 changes: 47 additions & 0 deletions datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,50 @@ OPTIONS (
format.delimiter '|',
has_header false,
compression gzip);

# Create an external parquet table and infer schema to order by

# query should succeed
statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test that shows the table is actually ordered correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do


## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 ASC]
query TT
EXPLAIN SELECT id FROM t ORDER BY id ASC;
----
logical_plan
01)Sort: t.id ASC NULLS LAST
02)--TableScan: t projection=[id]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST]

## Test a DESC order and verify that output_ordering is ASC from the previous OBRDER BY
query TT
EXPLAIN SELECT id FROM t ORDER BY id DESC;
----
logical_plan
01)Sort: t.id DESC NULLS FIRST
02)--TableScan: t projection=[id]
physical_plan
01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST]

statement ok
DROP TABLE t;

# Create table with non default sort order
statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id DESC NULLS FIRST);

## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 DESC NULLS FIRST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test shows one small bug (the output ordering is DESC not DESC NULLS FIRST)

I think this is due to the fact that this PR computes nulls first like this:

                   let nulls_first = ordered_expr.nulls_first.unwrap_or(true);

But the SQL planner computes it like this:

// when asc is true, by default nulls last to be consistent with postgres
// postgres rule: https://www.postgresql.org/docs/current/queries-order.html
nulls_first.unwrap_or(!asc),

query TT
EXPLAIN SELECT id FROM t;
----
logical_plan TableScan: t projection=[id]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 DESC]

statement ok
DROP TABLE t;

# query should fail with bad column
statement error DataFusion error: Error during planning: Column foo is not in schema
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason this will fail is that there is already a table named t -- so it is probably good to check the actual error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

7 changes: 0 additions & 7 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,6 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te
query error DataFusion error: Error during planning: Column a is not in schema
CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';


# Create external table with DDL ordered columns without schema
# When schema is missing the query is expected to fail
query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\.
CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';


# Sort with duplicate sort expressions
# Table is sorted multiple times on the same column name and should not fail
statement ok
Expand Down