Skip to content

Commit

Permalink
Make unnest consistent with DuckDB/ClickHouse, add option for preserv…
Browse files Browse the repository at this point in the history
…e_nulls, update docs
  • Loading branch information
alamb committed Aug 1, 2023
1 parent ddb9549 commit 0f060b7
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 11 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod stats;
mod table_reference;
pub mod test_util;
pub mod tree_node;
mod unnest;
pub mod utils;

pub use column::Column;
Expand All @@ -51,6 +52,7 @@ pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
pub use unnest::UnnestOptions;

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
24 changes: 21 additions & 3 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::{DataFusionError, SchemaError};
use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
Expand Down Expand Up @@ -151,20 +151,38 @@ impl DataFrame {

/// Expand each list element of a column to multiple rows.
///
/// Seee also:
///
/// 1. [`UnnestOptions`] documentation for the behavior of `unnest`
/// 2. [`Self::unnest_column_with_options`]
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let df = df.unnest_column("a")?;
/// let df = df.unnest_column_wtith_optons("a")?;
/// # Ok(())
/// # }
/// ```
pub fn unnest_column(self, column: &str) -> Result<DataFrame> {
self.unnest_column_with_options(column, UnnestOptions::new())
}

/// Expand each list element of a column to multiple rows, with
/// behavior controlled by [`UnnestOptions`].
///
/// Please see the documentation on [`UnnestOptions`] for more
/// details about the meaning of unnest.
pub fn unnest_column_with_options(
self,
column: &str,
options: UnnestOptions,
) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.unnest_column(column)?
.unnest_column_with_options(column, options)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
}
Expand Down
23 changes: 21 additions & 2 deletions datafusion/core/src/physical_plan/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::UnnestOptions;
use datafusion_common::{cast::as_primitive_array, DataFusionError, Result};
use datafusion_execution::TaskContext;
use futures::Stream;
Expand All @@ -43,7 +44,10 @@ use crate::physical_plan::{

use super::DisplayAs;

/// Unnest the given column by joining the row with each value in the nested type.
/// Unnest the given column by joining the row with each value in the
/// nested type.
///
/// See [`UnnestOptions`] for more details and an example.
#[derive(Debug)]
pub struct UnnestExec {
/// Input execution plan
Expand All @@ -52,15 +56,23 @@ pub struct UnnestExec {
schema: SchemaRef,
/// The unnest column
column: Column,
/// Options
options: UnnestOptions,
}

impl UnnestExec {
/// Create a new [UnnestExec].
pub fn new(input: Arc<dyn ExecutionPlan>, column: Column, schema: SchemaRef) -> Self {
pub fn new(
input: Arc<dyn ExecutionPlan>,
column: Column,
schema: SchemaRef,
options: UnnestOptions,
) -> Self {
UnnestExec {
input,
schema,
column,
options,
}
}
}
Expand Down Expand Up @@ -107,6 +119,7 @@ impl ExecutionPlan for UnnestExec {
children[0].clone(),
self.column.clone(),
self.schema.clone(),
self.options.clone(),
)))
}

Expand All @@ -133,6 +146,12 @@ impl ExecutionPlan for UnnestExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;

if self.options.preserve_nulls {
return Err(DataFusionError::NotImplemented(
"Unest with preserve_nulls=true".to_string(),
));
}

Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,12 +1125,12 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema }) => {
LogicalPlan::Unnest(Unnest { input, column, schema, options }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(UnnestExec::new(input, column_exec, schema)))
Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
Expand Down
183 changes: 182 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion::prelude::JoinType;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::test_util::parquet_test_data;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::Expr::Wildcard;
Expand Down Expand Up @@ -1044,6 +1044,82 @@ async fn unnest_columns() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn unnest_column_preserve_nulls_not_supported() -> Result<()> {
// Unnest, preserving nulls not yet supported
let options = UnnestOptions::new().with_preserve_nulls(true);

let results = table_with_lists_and_nulls()
.await?
.clone()
.unnest_column_with_options("list", options)?
.collect()
.await;

assert_eq!(
results.unwrap_err().to_string(),
"This feature is not implemented: Unest with preserve_nulls=true"
);
Ok(())
}
#[tokio::test]
#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
async fn unnest_column_nulls() -> Result<()> {
let df = table_with_lists_and_nulls().await?;
let results = df.clone().collect().await?;
let expected = vec![
"+--------+----+",
"| list | id |",
"+--------+----+",
"| [1, 2] | A |",
"| | B |",
"| [] | C |",
"| [3] | D |",
"+--------+----+",
];
assert_batches_eq!(expected, &results);

// Unnest, preserving nulls (row with B is preserved)
let options = UnnestOptions::new().with_preserve_nulls(true);

let results = df
.clone()
.unnest_column_with_options("list", options)?
.collect()
.await?;
let expected = vec![
"+------+----+",
"| list | id |",
"+------+----+",
"| 1 | A |",
"| 2 | A |",
"| | B |",
"| 3 | D |",
"+------+----+",
];
assert_batches_eq!(expected, &results);

// NOTE this is incorrect,
let options = UnnestOptions::new().with_preserve_nulls(false);
let results = df
.unnest_column_with_options("list", options)?
.collect()
.await?;
let expected = vec![
"+------+----+",
"| list | id |",
"+------+----+",
"| 1 | A |",
"| 2 | A |",
"| | B |", // this row should not be here
"| 3 | D |",
"+------+----+",
];
assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn unnest_fixed_list() -> Result<()> {
let mut shape_id_builder = UInt32Builder::new();
Expand Down Expand Up @@ -1114,6 +1190,77 @@ async fn unnest_fixed_list() -> Result<()> {
Ok(())
}

#[tokio::test]
#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
async fn unnest_fixed_list_nonull() -> Result<()> {
let mut shape_id_builder = UInt32Builder::new();
let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);

for idx in 0..6 {
// Append shape id.
shape_id_builder.append_value(idx as u32 + 1);

tags_builder
.values()
.append_value(format!("tag{}1", idx + 1));
tags_builder
.values()
.append_value(format!("tag{}2", idx + 1));
tags_builder.append(true);
}

let batch = RecordBatch::try_from_iter(vec![
("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
("tags", Arc::new(tags_builder.finish()) as ArrayRef),
])?;

let ctx = SessionContext::new();
ctx.register_batch("shapes", batch)?;
let df = ctx.table("shapes").await?;

let results = df.clone().collect().await?;
let expected = vec![
"+----------+----------------+",
"| shape_id | tags |",
"+----------+----------------+",
"| 1 | [tag11, tag12] |",
"| 2 | [tag21, tag22] |",
"| 3 | [tag31, tag32] |",
"| 4 | [tag41, tag42] |",
"| 5 | [tag51, tag52] |",
"| 6 | [tag61, tag62] |",
"+----------+----------------+",
];
assert_batches_sorted_eq!(expected, &results);

let options = UnnestOptions::new().with_preserve_nulls(true);
let results = df
.unnest_column_with_options("tags", options)?
.collect()
.await?;
let expected = vec![
"+----------+-------+",
"| shape_id | tags |",
"+----------+-------+",
"| 1 | tag11 |",
"| 1 | tag12 |",
"| 2 | tag21 |",
"| 2 | tag22 |",
"| 3 | tag31 |",
"| 3 | tag32 |",
"| 4 | tag41 |",
"| 4 | tag42 |",
"| 5 | tag51 |",
"| 5 | tag52 |",
"| 6 | tag61 |",
"| 6 | tag62 |",
"+----------+-------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn unnest_aggregate_columns() -> Result<()> {
const NUM_ROWS: usize = 5;
Expand Down Expand Up @@ -1294,6 +1441,40 @@ async fn table_with_nested_types(n: usize) -> Result<DataFrame> {
ctx.table("shapes").await
}

/// A a data frame that a list of integers and string IDs
async fn table_with_lists_and_nulls() -> Result<DataFrame> {
let mut list_builder = ListBuilder::new(UInt32Builder::new());
let mut id_builder = StringBuilder::new();

// [1, 2], A
list_builder.values().append_value(1);
list_builder.values().append_value(2);
list_builder.append(true);
id_builder.append_value("A");

// NULL, B
list_builder.append(false);
id_builder.append_value("B");

// [], C
list_builder.append(true);
id_builder.append_value("C");

// [3], D
list_builder.values().append_value(3);
list_builder.append(true);
id_builder.append_value("D");

let batch = RecordBatch::try_from_iter(vec![
("list", Arc::new(list_builder.finish()) as ArrayRef),
("id", Arc::new(id_builder.finish()) as ArrayRef),
])?;

let ctx = SessionContext::new();
ctx.register_batch("shapes", batch)?;
ctx.table("shapes").await
}

pub async fn register_alltypes_tiny_pages_parquet(ctx: &SessionContext) -> Result<()> {
let testdata = parquet_test_data();
ctx.register_parquet(
Expand Down
Loading

0 comments on commit 0f060b7

Please sign in to comment.