Skip to content

Commit

Permalink
Support unparsing implicit lateral UNNEST plan to SQL text (#13824)
Browse files Browse the repository at this point in the history
* support unparsing the implicit lateral unnest plan

* cargo clippy and fmt

* refactor for `check_unnest_placeholder_with_outer_ref`

* add const for the prefix string of unnest and outer refernece column
  • Loading branch information
goldmedal authored Dec 25, 2024
1 parent 073a3b1 commit f52c56b
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 18 deletions.
9 changes: 7 additions & 2 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2536,14 +2536,19 @@ pub fn schema_name_from_sorts(sorts: &[Sort]) -> Result<String, fmt::Error> {
Ok(s)
}

pub const OUTER_REFERENCE_COLUMN_PREFIX: &str = "outer_ref";
pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";

/// Format expressions for display as part of a logical plan. In many cases, this will produce
/// similar output to `Expr.name()` except that column names will be prefixed with '#'.
impl Display for Expr {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS {name}"),
Expr::Column(c) => write!(f, "{c}"),
Expr::OuterReferenceColumn(_, c) => write!(f, "outer_ref({c})"),
Expr::OuterReferenceColumn(_, c) => {
write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")
}
Expr::ScalarVariable(_, var_names) => write!(f, "{}", var_names.join(".")),
Expr::Literal(v) => write!(f, "{v:?}"),
Expr::Case(case) => {
Expand Down Expand Up @@ -2736,7 +2741,7 @@ impl Display for Expr {
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { expr }) => {
write!(f, "UNNEST({expr})")
write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
}
}
Expand Down
83 changes: 70 additions & 13 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ use super::{
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
use crate::unparser::utils::unproject_agg_exprs;
use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
use datafusion_expr::{
expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
Expand Down Expand Up @@ -235,9 +236,10 @@ impl Unparser<'_> {
plan: &LogicalPlan,
relation: &mut RelationBuilder,
alias: Option<ast::TableAlias>,
lateral: bool,
) -> Result<()> {
let mut derived_builder = DerivedRelationBuilder::default();
derived_builder.lateral(false).alias(alias).subquery({
derived_builder.lateral(lateral).alias(alias).subquery({
let inner_statement = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statement {
inner_query
Expand All @@ -257,15 +259,17 @@ impl Unparser<'_> {
alias: &str,
plan: &LogicalPlan,
relation: &mut RelationBuilder,
lateral: bool,
) -> Result<()> {
if self.dialect.requires_derived_table_alias() {
self.derive(
plan,
relation,
Some(self.new_table_alias(alias.to_string(), vec![])),
lateral,
)
} else {
self.derive(plan, relation, None)
self.derive(plan, relation, None, lateral)
}
}

Expand Down Expand Up @@ -317,10 +321,12 @@ impl Unparser<'_> {
// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
if self.dialect.unnest_as_table_factor()
&& p.expr.len() == 1
&& Self::is_unnest_placeholder(&p.expr[0])
{
let unnest_input_type = if p.expr.len() == 1 {
Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
} else {
None
};
if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select, relation);
Expand All @@ -333,6 +339,9 @@ impl Unparser<'_> {
"derived_projection",
plan,
relation,
unnest_input_type
.filter(|t| matches!(t, UnnestInputType::OuterReference))
.is_some(),
);
}
self.reconstruct_select_statement(plan, p, select)?;
Expand Down Expand Up @@ -365,6 +374,7 @@ impl Unparser<'_> {
"derived_limit",
plan,
relation,
false,
);
}
if let Some(fetch) = &limit.fetch {
Expand Down Expand Up @@ -402,6 +412,7 @@ impl Unparser<'_> {
"derived_sort",
plan,
relation,
false,
);
}
let Some(query_ref) = query else {
Expand Down Expand Up @@ -472,6 +483,7 @@ impl Unparser<'_> {
"derived_distinct",
plan,
relation,
false,
);
}
let (select_distinct, input) = match distinct {
Expand Down Expand Up @@ -658,6 +670,7 @@ impl Unparser<'_> {
"derived_union",
plan,
relation,
false,
);
}

Expand Down Expand Up @@ -723,19 +736,54 @@ impl Unparser<'_> {
internal_err!("Unnest input is not a Projection: {unnest:?}")
}
}
_ => not_impl_err!("Unsupported operator: {plan:?}"),
LogicalPlan::Subquery(subquery)
if find_unnest_node_until_relation(subquery.subquery.as_ref())
.is_some() =>
{
if self.dialect.unnest_as_table_factor() {
self.select_to_sql_recursively(
subquery.subquery.as_ref(),
query,
select,
relation,
)
} else {
self.derive_with_dialect_alias(
"derived_unnest",
subquery.subquery.as_ref(),
relation,
true,
)
}
}
_ => {
not_impl_err!("Unsupported operator: {plan:?}")
}
}
}

/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`
/// Only match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
fn is_unnest_placeholder(expr: &Expr) -> bool {
/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
///
/// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
/// it means it is a scalar column, return [UnnestInputType::Scalar].
/// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
/// it means it is an outer reference column, return [UnnestInputType::OuterReference].
/// - If the column is not a placeholder column, return [None].
///
/// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
return name.starts_with(UNNEST_PLACEHOLDER);
if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
if prefix.starts_with(&format!("({}(", OUTER_REFERENCE_COLUMN_PREFIX))
{
return Some(UnnestInputType::OuterReference);
}
return Some(UnnestInputType::Scalar);
}
}
}
false
None
}

fn unnest_to_table_factor_sql(
Expand Down Expand Up @@ -1092,3 +1140,12 @@ impl From<BuilderError> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}

/// The type of the input to the UNNEST table factor.
#[derive(Debug)]
enum UnnestInputType {
/// The input is a column reference. It will be presented like `outer_ref(column_name)`.
OuterReference,
/// The input is a scalar value. It will be presented like a scalar array or struct.
Scalar,
}
58 changes: 55 additions & 3 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter},
Column, HashMap, Result, TableReference,
};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr::{Alias, UNNEST_COLUMN_PREFIX};
use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr};
use sqlparser::ast::Ident;

Expand Down Expand Up @@ -190,10 +190,11 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
}
}

/// This logic is to work out the columns and inner query for SubqueryAlias plan for both types of
/// subquery
/// This logic is to work out the columns and inner query for SubqueryAlias plan for some types of
/// subquery or unnest
/// - `(SELECT column_a as a from table) AS A`
/// - `(SELECT column_a from table) AS A (a)`
/// - `SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)` (see [find_unnest_column_alias])
///
/// A roundtrip example for table alias with columns
///
Expand Down Expand Up @@ -222,6 +223,15 @@ pub(super) fn subquery_alias_inner_query_and_columns(
) -> (&LogicalPlan, Vec<Ident>) {
let plan: &LogicalPlan = subquery_alias.input.as_ref();

if let LogicalPlan::Subquery(subquery) = plan {
let (inner_projection, Some(column)) =
find_unnest_column_alias(subquery.subquery.as_ref())
else {
return (plan, vec![]);
};
return (inner_projection, vec![Ident::new(column)]);
}

let LogicalPlan::Projection(outer_projections) = plan else {
return (plan, vec![]);
};
Expand Down Expand Up @@ -257,6 +267,48 @@ pub(super) fn subquery_alias_inner_query_and_columns(
(outer_projections.input.as_ref(), columns)
}

/// Try to find the column alias for UNNEST in the inner projection.
/// For example:
/// ```sql
/// SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)
/// ```
/// The above query will be parsed into the following plan:
/// ```text
/// Projection: *
/// Cross Join:
/// SubqueryAlias: t1
/// TableScan: t
/// SubqueryAlias: u
/// Subquery:
/// Projection: UNNEST(outer_ref(t1.c1)) AS c1
/// Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS UNNEST(outer_ref(t1.c1))
/// Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1] structs[]
/// Projection: outer_ref(t1.c1) AS __unnest_placeholder(outer_ref(t1.c1))
/// EmptyRelation
/// ```
/// The function will return the inner projection and the column alias `c1` if the column name
/// starts with `UNNEST(` (the `Display` result of [Expr::Unnest]) in the inner projection.
pub(super) fn find_unnest_column_alias(
plan: &LogicalPlan,
) -> (&LogicalPlan, Option<String>) {
if let LogicalPlan::Projection(projection) = plan {
if projection.expr.len() != 1 {
return (plan, None);
}
if let Some(Expr::Alias(alias)) = projection.expr.first() {
if alias
.expr
.schema_name()
.to_string()
.starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
{
return (projection.input.as_ref(), Some(alias.name.clone()));
}
}
}
(plan, None)
}

/// Injects column aliases into a subquery's logical plan. The function searches for a `Projection`
/// within the given plan, which may be wrapped by other operators (e.g., LIMIT, SORT).
/// If the top-level plan is a `Projection`, it directly injects the column aliases.
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ pub(crate) fn find_unnest_node_within_select(plan: &LogicalPlan) -> Option<&Unne
}
}

/// Recursively searches children of [LogicalPlan] to find Unnest node if exist
/// until encountering a Relation node with single input
pub(crate) fn find_unnest_node_until_relation(plan: &LogicalPlan) -> Option<&Unnest> {
// Note that none of the nodes that have a corresponding node can have more
// than 1 input node. E.g. Projection / Filter always have 1 input node.
let input = plan.inputs();
let input = if input.len() > 1 {
return None;
} else {
input.first()?
};

if let LogicalPlan::Unnest(unnest) = input {
Some(unnest)
} else if let LogicalPlan::TableScan(_) = input {
None
} else if let LogicalPlan::Subquery(_) = input {
None
} else if let LogicalPlan::SubqueryAlias(_) = input {
None
} else {
find_unnest_node_within_select(input)
}
}

/// Recursively searches children of [LogicalPlan] to find Window nodes if exist
/// prior to encountering a Join, TableScan, or a nested subquery (derived table factor).
/// If Window node is not found prior to this or at all before reaching the end
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,30 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col) AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))")"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))") AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
];

for query in tests {
Expand Down

0 comments on commit f52c56b

Please sign in to comment.