diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b78bdf082f333..473f846c9313b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1002,12 +1002,11 @@ object EliminateSorts extends Rule[LogicalPlan] { private def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = { def isOrderIrrelevantAggFunction(func: AggregateFunction): Boolean = func match { - case _: Sum => true - case _: Min => true - case _: Max => true - case _: Count => true - case _: Average => true - case _: CentralMomentAgg => true + case _: Min | _: Max | _: Count => true + // Arithmetic operations for floating-point values are order-sensitive + // (they are not associative). + case _: Sum | _: Average | _: CentralMomentAgg => + !Seq(FloatType, DoubleType).exists(_.sameType(func.children.head.dataType)) case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index c117ee7818c01..5020c1047f8dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1271,6 +1271,23 @@ class SubquerySuite extends QueryTest with SharedSparkSession { } } + test("Cannot remove sort for floating-point order-sensitive aggregates from subquery") { + Seq("float", "double").foreach { typeName => + Seq("SUM", "AVG", "KURTOSIS", "SKEWNESS", "STDDEV_POP", "STDDEV_SAMP", + "VAR_POP", "VAR_SAMP").foreach { aggName => + val query = + s""" + |SELECT k, $aggName(v) FROM ( + | SELECT k, v + | FROM VALUES (1, $typeName(2.0)), (2, $typeName(1.0)) t(k, v) + | ORDER BY v) + |GROUP BY k + """.stripMargin + assert(getNumSortsInQuery(query) == 1) + } + } + } + test("SPARK-25482: Forbid pushdown to datasources of filters containing subqueries") { withTempView("t1", "t2") { sql("create temporary view t1(a int) using parquet")