Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unparsing implicit lateral UNNEST plan to SQL text #13824

Merged
merged 5 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 51 additions & 12 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::{
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
use crate::unparser::utils::unproject_agg_exprs;
use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
Expand Down Expand Up @@ -235,9 +235,10 @@ impl Unparser<'_> {
plan: &LogicalPlan,
relation: &mut RelationBuilder,
alias: Option<ast::TableAlias>,
lateral: bool,
) -> Result<()> {
let mut derived_builder = DerivedRelationBuilder::default();
derived_builder.lateral(false).alias(alias).subquery({
derived_builder.lateral(lateral).alias(alias).subquery({
let inner_statement = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statement {
inner_query
Expand All @@ -257,15 +258,17 @@ impl Unparser<'_> {
alias: &str,
plan: &LogicalPlan,
relation: &mut RelationBuilder,
lateral: bool,
) -> Result<()> {
if self.dialect.requires_derived_table_alias() {
self.derive(
plan,
relation,
Some(self.new_table_alias(alias.to_string(), vec![])),
lateral,
)
} else {
self.derive(plan, relation, None)
self.derive(plan, relation, None, lateral)
}
}

Expand Down Expand Up @@ -317,10 +320,12 @@ impl Unparser<'_> {
// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
if self.dialect.unnest_as_table_factor()
&& p.expr.len() == 1
&& Self::is_unnest_placeholder(&p.expr[0])
{
let (is_unnest, is_lateral) = if p.expr.len() == 1 {
Self::is_unnest_placeholder_with_outer_ref(&p.expr[0])
} else {
(false, false)
};
if self.dialect.unnest_as_table_factor() && is_unnest {
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select, relation);
Expand All @@ -333,6 +338,7 @@ impl Unparser<'_> {
"derived_projection",
plan,
relation,
is_lateral,
);
}
self.reconstruct_select_statement(plan, p, select)?;
Expand Down Expand Up @@ -365,6 +371,7 @@ impl Unparser<'_> {
"derived_limit",
plan,
relation,
false,
);
}
if let Some(fetch) = &limit.fetch {
Expand Down Expand Up @@ -402,6 +409,7 @@ impl Unparser<'_> {
"derived_sort",
plan,
relation,
false,
);
}
let Some(query_ref) = query else {
Expand Down Expand Up @@ -472,6 +480,7 @@ impl Unparser<'_> {
"derived_distinct",
plan,
relation,
false,
);
}
let (select_distinct, input) = match distinct {
Expand Down Expand Up @@ -658,6 +667,7 @@ impl Unparser<'_> {
"derived_union",
plan,
relation,
false,
);
}

Expand Down Expand Up @@ -723,19 +733,48 @@ impl Unparser<'_> {
internal_err!("Unnest input is not a Projection: {unnest:?}")
}
}
_ => not_impl_err!("Unsupported operator: {plan:?}"),
LogicalPlan::Subquery(subquery)
if find_unnest_node_until_relation(subquery.subquery.as_ref())
.is_some() =>
{
if self.dialect.unnest_as_table_factor() {
self.select_to_sql_recursively(
subquery.subquery.as_ref(),
query,
select,
relation,
)
} else {
self.derive_with_dialect_alias(
"derived_unnest",
subquery.subquery.as_ref(),
relation,
true,
)
}
}
_ => {
not_impl_err!("Unsupported operator: {plan:?}")
}
}
}

/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`
/// Only match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
fn is_unnest_placeholder(expr: &Expr) -> bool {
/// The first return value is a boolean indicating if the column is a placeholder column:
/// Try to match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
/// The second return value is a boolean indicating if the column uses an outer reference:
/// Try to match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`
///
/// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
fn is_unnest_placeholder_with_outer_ref(expr: &Expr) -> (bool, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we define an enum that we return here? That is clearer to read than two booleans and we can limit it to reference only the possible states (i.e. it doesn't look like (false, true) can happen, but its something a caller would need to handle from the type system)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum sounds like a good idea. I will try it. Thanks 👍

if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
return name.starts_with(UNNEST_PLACEHOLDER);
if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
return (true, prefix.starts_with("(outer_ref("));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of this string matching. At least the UNNEST_PLACEHOLDER is a shared const, but the outer_ref could potentially change and we would miss it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel very strongly about this since it does seem unlikely to change though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of this string matching.

I agree that string matching isn't a good way but I think it's the only way to recognize if the outer reference column is exits in the unnest plan currently. 🤔

At least the UNNEST_PLACEHOLDER is a shared const, but the outer_ref could potentially change and we would miss it here.

Maybe we can use something like UNNEST_PLACEHOLDER to create a global constant for outer_ref. At least then, we won't miss it when someone needs to change it.

Expr::OuterReferenceColumn(_, c) => write!(f, "outer_ref({c})"),

}
}
}
false
(false, false)
}

fn unnest_to_table_factor_sql(
Expand Down
51 changes: 49 additions & 2 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
}
}

/// This logic is to work out the columns and inner query for SubqueryAlias plan for both types of
/// subquery
/// This logic is to work out the columns and inner query for SubqueryAlias plan for some types of
/// subquery or unnest
/// - `(SELECT column_a as a from table) AS A`
/// - `(SELECT column_a from table) AS A (a)`
/// - `SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)` (see [find_unnest_column_alias])
///
/// A roundtrip example for table alias with columns
///
Expand Down Expand Up @@ -222,6 +223,15 @@ pub(super) fn subquery_alias_inner_query_and_columns(
) -> (&LogicalPlan, Vec<Ident>) {
let plan: &LogicalPlan = subquery_alias.input.as_ref();

if let LogicalPlan::Subquery(subquery) = plan {
let (inner_projection, Some(column)) =
find_unnest_column_alias(subquery.subquery.as_ref())
else {
return (plan, vec![]);
};
return (inner_projection, vec![Ident::new(column)]);
}

let LogicalPlan::Projection(outer_projections) = plan else {
return (plan, vec![]);
};
Expand Down Expand Up @@ -257,6 +267,43 @@ pub(super) fn subquery_alias_inner_query_and_columns(
(outer_projections.input.as_ref(), columns)
}

/// Try to find the column alias for UNNEST in the inner projection.
/// For example:
/// ```sql
/// SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)
/// ```
/// The above query will be parsed into the following plan:
/// ```text
/// Projection: *
/// Cross Join:
/// SubqueryAlias: t1
/// TableScan: t
/// SubqueryAlias: u
/// Subquery:
/// Projection: UNNEST(outer_ref(t1.c1)) AS c1
/// Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS UNNEST(outer_ref(t1.c1))
/// Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1] structs[]
/// Projection: outer_ref(t1.c1) AS __unnest_placeholder(outer_ref(t1.c1))
/// EmptyRelation
/// ```
/// The function will return the inner projection and the column alias `c1` if the column name
/// starts with `UNNEST(` (the `Display` result of [Expr::Unnest]) in the inner projection.
pub(super) fn find_unnest_column_alias(
plan: &LogicalPlan,
) -> (&LogicalPlan, Option<String>) {
if let LogicalPlan::Projection(projection) = plan {
if projection.expr.len() != 1 {
return (plan, None);
}
if let Some(Expr::Alias(alias)) = projection.expr.first() {
if alias.expr.schema_name().to_string().starts_with("UNNEST(") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not an expression of type Unnest? I don't think we need to do this string allocation here just to check that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not an Expr::Unnest. The full plan is

///     SubqueryAlias: u
///       Subquery:
///         Projection: UNNEST(outer_ref(t1.c1)) AS c1
///           Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS UNNEST(outer_ref(t1.c1))
///             Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1] structs[]
///               Projection: outer_ref(t1.c1) AS __unnest_placeholder(outer_ref(t1.c1))
///                 EmptyRelation

It's a column pointing to an alias of its child. The alias is built from an expression schema_name.

let post_unnest_expr = col(post_unnest_name.clone()).alias(alias_name);

I think there is a similar issue for #13824 (comment).
Maybe we can have a constant for the display result of UNNEST.

return (projection.input.as_ref(), Some(alias.name.clone()));
}
}
}
(plan, None)
}

/// Injects column aliases into a subquery's logical plan. The function searches for a `Projection`
/// within the given plan, which may be wrapped by other operators (e.g., LIMIT, SORT).
/// If the top-level plan is a `Projection`, it directly injects the column aliases.
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ pub(crate) fn find_unnest_node_within_select(plan: &LogicalPlan) -> Option<&Unne
}
}

/// Recursively searches children of [LogicalPlan] to find Unnest node if exist
/// until encountering a Relation node with single input
pub(crate) fn find_unnest_node_until_relation(plan: &LogicalPlan) -> Option<&Unnest> {
// Note that none of the nodes that have a corresponding node can have more
// than 1 input node. E.g. Projection / Filter always have 1 input node.
let input = plan.inputs();
let input = if input.len() > 1 {
return None;
} else {
input.first()?
};

if let LogicalPlan::Unnest(unnest) = input {
Some(unnest)
} else if let LogicalPlan::TableScan(_) = input {
None
} else if let LogicalPlan::Subquery(_) = input {
None
} else if let LogicalPlan::SubqueryAlias(_) = input {
None
} else {
find_unnest_node_within_select(input)
}
}

/// Recursively searches children of [LogicalPlan] to find Window nodes if exist
/// prior to encountering a Join, TableScan, or a nested subquery (derived table factor).
/// If Window node is not found prior to this or at all before reaching the end
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,30 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col) AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))")"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))") AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
];

for query in tests {
Expand Down
Loading