StatefulAggregationStrategy Execution Planning Strategy for EventTimeWatermark and Aggregate Logical Operators
StatefulAggregationStrategy
is an execution planning strategy (i.e. Strategy
) that IncrementalExecution uses to plan EventTimeWatermark
and Aggregate
logical operators in streaming Datasets.
Note
|
EventTimeWatermark logical operator is the result of withWatermark operator. |
Note
|
Aggregate logical operator represents groupBy and groupByKey aggregations (and SQL’s GROUP BY clause).
|
StatefulAggregationStrategy
is available using SessionState
.
spark.sessionState.planner.StatefulAggregationStrategy
Logical Operator | Physical Operator | ||
---|---|---|---|
In the order of preference:
|
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "count").
orderBy("group")
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
+- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#13, 200)
+- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
+- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
+- *Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(10.seconds)).
queryName("counts").
outputMode(OutputMode.Complete). // <-- required for groupBy
start
// Eventually...
consoleOutput.stop
Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.planStreamingAggregation
Internal Method
planStreamingAggregation(
groupingExpressions: Seq[NamedExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregation
takes the grouping attributes (from groupingExpressions
).
Note
|
groupingExpressions corresponds to the grouping function in groupBy operator.
|
planStreamingAggregation
creates an aggregate physical operator (called partialAggregate
) with:
-
requiredChildDistributionExpressions
undefined (i.e.None
) -
initialInputBufferOffset
as0
-
functionsWithoutDistinct
inPartial
mode -
child
operator as the inputchild
Note
|
|
planStreamingAggregation
creates an aggregate physical operator (called partialMerged1
) with:
-
requiredChildDistributionExpressions
based on the inputgroupingExpressions
-
initialInputBufferOffset
as the length ofgroupingExpressions
-
functionsWithoutDistinct
inPartialMerge
mode -
child
operator as partialAggregate aggregate physical operator created above
planStreamingAggregation
creates StateStoreRestoreExec with the grouping attributes, undefined StatefulOperatorStateInfo
, and partialMerged1 aggregate physical operator created above.
planStreamingAggregation
creates an aggregate physical operator (called partialMerged2
) with:
-
child
operator as StateStoreRestoreExec physical operator created above
Note
|
The only difference between partialMerged1 and partialMerged2 steps is the child physical operator. |
planStreamingAggregation
creates StateStoreSaveExec with:
-
the grouping attributes based on the input
groupingExpressions
-
No
stateInfo
,outputMode
andeventTimeWatermark
-
child
operator as partialMerged2 aggregate physical operator created above
In the end, planStreamingAggregation
creates the final aggregate physical operator (called finalAndCompleteAggregate
) with:
-
requiredChildDistributionExpressions
based on the inputgroupingExpressions
-
initialInputBufferOffset
as the length ofgroupingExpressions
-
functionsWithoutDistinct
inFinal
mode -
child
operator as StateStoreSaveExec physical operator created above
Note
|
planStreamingAggregation is used exclusively when StatefulAggregationStrategy plans a streaming aggregation.
|