Skip to content

Commit

Permalink
[SPARK-50403][SQL] Fix parameterized EXECUTE IMMEDIATE
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Remove the assert of single parameterized query because it restricts parameterization of `EXECUTE IMMEDIATE`. The assert checks that only single node of the type `ParameterizedQuery` presents in a query, but `EXECUTE IMMEDIATE` adds one `ParameterizedQuery` + `sql()` adds another `ParameterizedQuery`. So, this case is prohibited by the assert though it is a valid use case from user's perspective.
2. Modify parameters binding: stop the bind procedure when face to another parameterized query. For example, the sql text passed to `spark.sql()` contains `EXECUTE IMMEDIATE`, and `sql()` parameters don't affect on the sql query string in `EXECUTE IMMEDIATE`.
3. Allow parameters in `EXECUTE IMMEDIATE` variables.

### Why are the changes needed?
Before the changes, the following query fails with the internal error:
```scala
scala> spark.sql("execute immediate 'select ?' using 1", Map("param1" -> "1"))
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
```

### Does this PR introduce _any_ user-facing change?
Yes, the query above returns proper results instead of the internal error:
```scala
scala> spark.sql("execute immediate 'select ?' using 1", Map("param1" -> "1"))
val res2: org.apache.spark.sql.DataFrame = [1: int]
```

### How was this patch tested?
By running the new test:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.ParametersSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#49442 from MaxGekk/fix-params-execute-immediate.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
MaxGekk committed Jan 14, 2025
1 parent 2d498d5 commit 1fd8362
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
def resolveVariable(e: Expression): Expression = {

/**
* We know that the expression is either UnresolvedAttribute or Alias, as passed from the
* parser. If it is an UnresolvedAttribute, we look it up in the catalog and return it. If it
* is an Alias, we resolve the child and return an Alias with the same name.
* We know that the expression is either UnresolvedAttribute, Alias or Parameter, as passed from
* the parser. If it is an UnresolvedAttribute, we look it up in the catalog and return it. If
* it is an Alias, we resolve the child and return an Alias with the same name. If it is
* a Parameter, we leave it as is because the parameter belongs to another parameterized
* query and should be resolved later.
*/
e match {
case u: UnresolvedAttribute =>
getVariableReference(u, u.nameParts)
case a: Alias =>
Alias(resolveVariable(a.child), a.name)()
case p: Parameter => p
case other =>
throw QueryCompilationErrors.unsupportedParameterExpression(other)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,6 @@ case class PosParameterizedQuery(child: LogicalPlan, args: Seq[Expression])
copy(child = newChild)
}

/**
* Base class for rules that process parameterized queries.
*/
abstract class ParameterizedQueryProcessor extends Rule[LogicalPlan] {
def assertUnresolvedPlanHasSingleParameterizedQuery(plan: LogicalPlan): Unit = {
if (plan.containsPattern(PARAMETERIZED_QUERY)) {
val parameterizedQueries = plan.collect { case p: ParameterizedQuery => p }
assert(parameterizedQueries.length == 1)
}
}
}

/**
* Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised plans to be
* resolved later by the analyzer.
Expand All @@ -127,10 +115,8 @@ abstract class ParameterizedQueryProcessor extends Rule[LogicalPlan] {
* `PosParameterizedQuery(ExplainCommand(ExplainCommand(SomeQuery(...))))` =>
* `ExplainCommand(ExplainCommand(PosParameterizedQuery(SomeQuery(...))))`
*/
object MoveParameterizedQueriesDown extends ParameterizedQueryProcessor {
object MoveParameterizedQueriesDown extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
assertUnresolvedPlanHasSingleParameterizedQuery(plan)

plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
case pq: ParameterizedQuery if pq.exists(isSupervisingCommand) =>
moveParameterizedQueryIntoSupervisingCommand(pq)
Expand Down Expand Up @@ -161,7 +147,7 @@ object MoveParameterizedQueriesDown extends ParameterizedQueryProcessor {
* by collection constructor functions such as `map()`, `array()`, `struct()`
* from the user-specified arguments.
*/
object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase {
object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase {
private def checkArgs(args: Iterable[(String, Expression)]): Unit = {
def isNotAllowed(expr: Expression): Boolean = expr.exists {
case _: Literal | _: CreateArray | _: CreateNamedStruct |
Expand All @@ -176,15 +162,18 @@ object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase {
}
}

private def bind(p: LogicalPlan)(f: PartialFunction[Expression, Expression]): LogicalPlan = {
p.resolveExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse {
case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
})
private def bind(p0: LogicalPlan)(f: PartialFunction[Expression, Expression]): LogicalPlan = {
var stop = false
p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) {
case p1 =>
stop = p1.isInstanceOf[ParameterizedQuery]
p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse {
case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
})
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
assertUnresolvedPlanHasSingleParameterizedQuery(plan)

plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
// We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE
// relations are not children of `UnresolvedWith`.
Expand Down
32 changes: 32 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -769,4 +769,36 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest {
checkAnswer(spark.sql(query(":cte"), args = Map("cte" -> "t1")), Row(1))
checkAnswer(spark.sql(query("?"), args = Array("t1")), Row(1))
}

test("SPARK-50403: parameterized execute immediate") {
checkAnswer(spark.sql("execute immediate 'select ?' using ?", Array(1)), Row(1))
checkAnswer(spark.sql("execute immediate 'select ?, ?' using ?, 2", Array(1)), Row(1, 2))
checkError(
exception = intercept[AnalysisException] {
spark.sql("execute immediate 'select ?, ?' using 1", Array(2))
},
condition = "UNBOUND_SQL_PARAMETER",
parameters = Map("name" -> "_10"),
context = ExpectedContext("?", 10, 10))

checkAnswer(spark.sql("execute immediate 'select ?' using 1", Map("param1" -> "1")), Row(1))
checkAnswer(spark.sql("execute immediate 'select :param1' using :param2 as param1",
Map("param2" -> 42)), Row(42))
checkAnswer(spark.sql(
"execute immediate 'select :param1, :param2' using :param2 as param1, 43 as param2",
Map("param2" -> 42)), Row(42, 43))
checkAnswer(spark.sql("execute immediate 'select :param' using 0 as param",
Map("param" -> 42)), Row(0))
checkError(
exception = intercept[AnalysisException] {
spark.sql("execute immediate 'select :param1, :param2' using 1 as param1",
Map("param2" -> 2))
},
condition = "UNBOUND_SQL_PARAMETER",
parameters = Map("name" -> "param2"),
context = ExpectedContext(":param2", 16, 22))

checkAnswer(spark.sql("execute immediate 'select ?' using :param", Map("param" -> 2)), Row(2))
checkAnswer(spark.sql("execute immediate 'select :param' using ? as param", Array(3)), Row(3))
}
}

0 comments on commit 1fd8362

Please sign in to comment.