Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unparsing implicit lateral UNNEST plan to SQL text #13824

Merged
merged 5 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2536,14 +2536,19 @@ pub fn schema_name_from_sorts(sorts: &[Sort]) -> Result<String, fmt::Error> {
Ok(s)
}

pub const OUTER_REFERENCE_COLUMN_PREFIX: &str = "outer_ref";
pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";

/// Format expressions for display as part of a logical plan. In many cases, this will produce
/// similar output to `Expr.name()` except that column names will be prefixed with '#'.
impl Display for Expr {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS {name}"),
Expr::Column(c) => write!(f, "{c}"),
Expr::OuterReferenceColumn(_, c) => write!(f, "outer_ref({c})"),
Expr::OuterReferenceColumn(_, c) => {
write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")
}
Expr::ScalarVariable(_, var_names) => write!(f, "{}", var_names.join(".")),
Expr::Literal(v) => write!(f, "{v:?}"),
Expr::Case(case) => {
Expand Down Expand Up @@ -2736,7 +2741,7 @@ impl Display for Expr {
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { expr }) => {
write!(f, "UNNEST({expr})")
write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
}
}
Expand Down
83 changes: 70 additions & 13 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ use super::{
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
use crate::unparser::utils::unproject_agg_exprs;
use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
use datafusion_expr::{
expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
Expand Down Expand Up @@ -235,9 +236,10 @@ impl Unparser<'_> {
plan: &LogicalPlan,
relation: &mut RelationBuilder,
alias: Option<ast::TableAlias>,
lateral: bool,
) -> Result<()> {
let mut derived_builder = DerivedRelationBuilder::default();
derived_builder.lateral(false).alias(alias).subquery({
derived_builder.lateral(lateral).alias(alias).subquery({
let inner_statement = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statement {
inner_query
Expand All @@ -257,15 +259,17 @@ impl Unparser<'_> {
alias: &str,
plan: &LogicalPlan,
relation: &mut RelationBuilder,
lateral: bool,
) -> Result<()> {
if self.dialect.requires_derived_table_alias() {
self.derive(
plan,
relation,
Some(self.new_table_alias(alias.to_string(), vec![])),
lateral,
)
} else {
self.derive(plan, relation, None)
self.derive(plan, relation, None, lateral)
}
}

Expand Down Expand Up @@ -317,10 +321,12 @@ impl Unparser<'_> {
// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
if self.dialect.unnest_as_table_factor()
&& p.expr.len() == 1
&& Self::is_unnest_placeholder(&p.expr[0])
{
let unnest_input_type = if p.expr.len() == 1 {
Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
} else {
None
};
if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select, relation);
Expand All @@ -333,6 +339,9 @@ impl Unparser<'_> {
"derived_projection",
plan,
relation,
unnest_input_type
.filter(|t| matches!(t, UnnestInputType::OuterReference))
.is_some(),
);
}
self.reconstruct_select_statement(plan, p, select)?;
Expand Down Expand Up @@ -365,6 +374,7 @@ impl Unparser<'_> {
"derived_limit",
plan,
relation,
false,
);
}
if let Some(fetch) = &limit.fetch {
Expand Down Expand Up @@ -402,6 +412,7 @@ impl Unparser<'_> {
"derived_sort",
plan,
relation,
false,
);
}
let Some(query_ref) = query else {
Expand Down Expand Up @@ -472,6 +483,7 @@ impl Unparser<'_> {
"derived_distinct",
plan,
relation,
false,
);
}
let (select_distinct, input) = match distinct {
Expand Down Expand Up @@ -658,6 +670,7 @@ impl Unparser<'_> {
"derived_union",
plan,
relation,
false,
);
}

Expand Down Expand Up @@ -723,19 +736,54 @@ impl Unparser<'_> {
internal_err!("Unnest input is not a Projection: {unnest:?}")
}
}
_ => not_impl_err!("Unsupported operator: {plan:?}"),
LogicalPlan::Subquery(subquery)
if find_unnest_node_until_relation(subquery.subquery.as_ref())
.is_some() =>
{
if self.dialect.unnest_as_table_factor() {
self.select_to_sql_recursively(
subquery.subquery.as_ref(),
query,
select,
relation,
)
} else {
self.derive_with_dialect_alias(
"derived_unnest",
subquery.subquery.as_ref(),
relation,
true,
)
}
}
_ => {
not_impl_err!("Unsupported operator: {plan:?}")
}
}
}

/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`
/// Only match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
fn is_unnest_placeholder(expr: &Expr) -> bool {
/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
///
/// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
/// it means it is a scalar column, return [UnnestInputType::Scalar].
/// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
/// it means it is an outer reference column, return [UnnestInputType::OuterReference].
/// - If the column is not a placeholder column, return [None].
///
/// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
return name.starts_with(UNNEST_PLACEHOLDER);
if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
if prefix.starts_with(&format!("({}(", OUTER_REFERENCE_COLUMN_PREFIX))
{
return Some(UnnestInputType::OuterReference);
}
return Some(UnnestInputType::Scalar);
}
}
}
false
None
}

fn unnest_to_table_factor_sql(
Expand Down Expand Up @@ -1092,3 +1140,12 @@ impl From<BuilderError> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}

/// The type of the input to the UNNEST table factor.
#[derive(Debug)]
enum UnnestInputType {
/// The input is a column reference. It will be presented like `outer_ref(column_name)`.
OuterReference,
/// The input is a scalar value. It will be presented like a scalar array or struct.
Scalar,
}
58 changes: 55 additions & 3 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter},
Column, HashMap, Result, TableReference,
};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr::{Alias, UNNEST_COLUMN_PREFIX};
use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr};
use sqlparser::ast::Ident;

Expand Down Expand Up @@ -190,10 +190,11 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
}
}

/// This logic is to work out the columns and inner query for SubqueryAlias plan for both types of
/// subquery
/// This logic is to work out the columns and inner query for SubqueryAlias plan for some types of
/// subquery or unnest
/// - `(SELECT column_a as a from table) AS A`
/// - `(SELECT column_a from table) AS A (a)`
/// - `SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)` (see [find_unnest_column_alias])
///
/// A roundtrip example for table alias with columns
///
Expand Down Expand Up @@ -222,6 +223,15 @@ pub(super) fn subquery_alias_inner_query_and_columns(
) -> (&LogicalPlan, Vec<Ident>) {
let plan: &LogicalPlan = subquery_alias.input.as_ref();

if let LogicalPlan::Subquery(subquery) = plan {
let (inner_projection, Some(column)) =
find_unnest_column_alias(subquery.subquery.as_ref())
else {
return (plan, vec![]);
};
return (inner_projection, vec![Ident::new(column)]);
}

let LogicalPlan::Projection(outer_projections) = plan else {
return (plan, vec![]);
};
Expand Down Expand Up @@ -257,6 +267,48 @@ pub(super) fn subquery_alias_inner_query_and_columns(
(outer_projections.input.as_ref(), columns)
}

/// Try to find the column alias for UNNEST in the inner projection.
/// For example:
/// ```sql
/// SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)
/// ```
/// The above query will be parsed into the following plan:
/// ```text
/// Projection: *
/// Cross Join:
/// SubqueryAlias: t1
/// TableScan: t
/// SubqueryAlias: u
/// Subquery:
/// Projection: UNNEST(outer_ref(t1.c1)) AS c1
/// Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS UNNEST(outer_ref(t1.c1))
/// Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1] structs[]
/// Projection: outer_ref(t1.c1) AS __unnest_placeholder(outer_ref(t1.c1))
/// EmptyRelation
/// ```
/// The function will return the inner projection and the column alias `c1` if the column name
/// starts with `UNNEST(` (the `Display` result of [Expr::Unnest]) in the inner projection.
pub(super) fn find_unnest_column_alias(
plan: &LogicalPlan,
) -> (&LogicalPlan, Option<String>) {
if let LogicalPlan::Projection(projection) = plan {
if projection.expr.len() != 1 {
return (plan, None);
}
if let Some(Expr::Alias(alias)) = projection.expr.first() {
if alias
.expr
.schema_name()
.to_string()
.starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
{
return (projection.input.as_ref(), Some(alias.name.clone()));
}
}
}
(plan, None)
}

/// Injects column aliases into a subquery's logical plan. The function searches for a `Projection`
/// within the given plan, which may be wrapped by other operators (e.g., LIMIT, SORT).
/// If the top-level plan is a `Projection`, it directly injects the column aliases.
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ pub(crate) fn find_unnest_node_within_select(plan: &LogicalPlan) -> Option<&Unne
}
}

/// Recursively searches children of [LogicalPlan] to find Unnest node if exist
/// until encountering a Relation node with single input
pub(crate) fn find_unnest_node_until_relation(plan: &LogicalPlan) -> Option<&Unnest> {
// Note that none of the nodes that have a corresponding node can have more
// than 1 input node. E.g. Projection / Filter always have 1 input node.
let input = plan.inputs();
let input = if input.len() > 1 {
return None;
} else {
input.first()?
};

if let LogicalPlan::Unnest(unnest) = input {
Some(unnest)
} else if let LogicalPlan::TableScan(_) = input {
None
} else if let LogicalPlan::Subquery(_) = input {
None
} else if let LogicalPlan::SubqueryAlias(_) = input {
None
} else {
find_unnest_node_within_select(input)
}
}

/// Recursively searches children of [LogicalPlan] to find Window nodes if exist
/// prior to encountering a Join, TableScan, or a nested subquery (derived table factor).
/// If Window node is not found prior to this or at all before reaching the end
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,30 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col) AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))")"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))") AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
];

for query in tests {
Expand Down
Loading