StateStoreSaveExec
is a unary physical operator (i.e. UnaryExecNode
) that saves a streaming state to a state store with support for streaming watermark.
StateStoreSaveExec
is created exclusively when StatefulAggregationStrategy
plans streaming aggregate operators (aka streaming aggregates).
Note
|
|
The optional properties, i.e. StatefulOperatorStateInfo, output mode, and event time watermark, are undefined when StateStoreSaveExec
is created. StateStoreSaveExec
is updated to hold their streaming batch-specific execution properties when IncrementalExecution
prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution
plans a streaming query for a streaming batch).
Note
|
Unlike StateStoreRestoreExec operator, StateStoreSaveExec takes output mode and event time watermark when created.
|
When executed, StateStoreSaveExec
creates a StateStoreRDD to map over partitions with storeUpdateFunction
that manages the StateStore
.
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring it easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
scala> spark.sessionState.conf.numShufflePartitions
res2: Int = 1
// END: Only for easier debugging
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "value_count") // <-- creates a Aggregate logical operator
scala> counts.explain(true)
== Parsed Logical Plan ==
'Aggregate [timewindow('timestamp, 5000000, 5000000, 0) AS window#5 AS group#6], [timewindow('timestamp, 5000000, 5000000, 0) AS window#5 AS group#6, count('value) AS value_count#12]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@489cbbcb,rate,List(),None,List(),None,Map(),None), rate, [timestamp#0, value#1L]
...
== Physical Plan ==
*HashAggregate(keys=[window#18], functions=[count(value#1L)], output=[group#6, value_count#12L])
+- StateStoreSave [window#18], StatefulOperatorStateInfo(<unknown>,9a6d381e-1066-4e2c-abd2-27884a6c2d16,0,0), Append, 0
+- *HashAggregate(keys=[window#18], functions=[merge_count(value#1L)], output=[window#18, count#20L])
+- StateStoreRestore [window#18], StatefulOperatorStateInfo(<unknown>,9a6d381e-1066-4e2c-abd2-27884a6c2d16,0,0)
+- *HashAggregate(keys=[window#18], functions=[merge_count(value#1L)], output=[window#18, count#20L])
+- Exchange hashpartitioning(window#18, 1)
+- *HashAggregate(keys=[window#18], functions=[partial_count(value#1L)], output=[window#18, count#20L])
+- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#18, value#1L]
+- *Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
// Start the query and hence execute StateStoreSaveExec
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(1.hour)). // <-- should be enough time for exploration
outputMode(OutputMode.Complete).
start
// wait till the first batch which should happen right after start
import org.apache.spark.sql.execution.streaming._
val streamingBatch = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
scala> println(streamingBatch.logical.numberedTreeString)
00 Aggregate [window#13], [window#13 AS group#6, count(value#25L) AS value_count#12L]
01 +- Filter isnotnull(timestamp#24)
02 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#24, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#13, timestamp#24, value#25L]
03 +- LogicalRDD [timestamp#24, value#25L], true
// Note the number of partitions
// 200 is the default, but we have changed it above
scala> println(streamingBatch.toRdd.toDebugString)
(1) MapPartitionsRDD[20] at toRdd at <console>:38 []
| StateStoreRDD[19] at toRdd at <console>:38 []
| MapPartitionsRDD[18] at toRdd at <console>:38 []
| StateStoreRDD[17] at toRdd at <console>:38 []
| MapPartitionsRDD[16] at toRdd at <console>:38 []
| ShuffledRowRDD[4] at start at <console>:36 []
+-(0) MapPartitionsRDD[3] at start at <console>:36 []
| MapPartitionsRDD[2] at start at <console>:36 []
| MapPartitionsRDD[1] at start at <console>:36 []
| EmptyRDD[0] at start at <console>:36 []
scala> spark.sessionState.conf.numShufflePartitions
res6: Int = 1
Note
|
The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the child physical plan. There will be that many |
Note
|
StateStoreSaveExec behaves differently per output mode.
|
Key | Name (in UI) | Description | ||||||
---|---|---|---|---|---|---|---|---|
total time to update rows |
||||||||
total time to remove rows |
||||||||
time to commit changes |
||||||||
number of output rows |
||||||||
number of total state rows |
Number of the state keys in the state store Corresponds to |
|||||||
number of updated state rows |
Number of the state keys that were stored as updates in the state store in a trigger and for the keys in the result rows of the upstream physical operator.
|
|||||||
memory used by state |
Memory used by the StateStore |
When executed, StateStoreSaveExec
executes the child physical operator and creates a StateStoreRDD (with storeUpdateFunction
specific to the output mode).
The output schema of StateStoreSaveExec
is exactly the child's output schema.
The output partitioning of StateStoreSaveExec
is exactly the child's output partitioning.
Tip
|
Enable Add the following line to
Refer to Logging. |
doExecute(): RDD[InternalRow]
Note
|
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow ).
|
Internally, doExecute
initializes metrics.
Note
|
doExecute requires that the optional outputMode is at this point defined (that should have happened when IncrementalExecution had prepared a streaming aggregation for execution).
|
doExecute
executes child physical operator and creates a StateStoreRDD with storeUpdateFunction
that:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child).
-
Branches off per output mode.
Output Mode | doExecute’s Behaviour | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|||||||||||||
The number of keys stored in the state store is recorded in numUpdatedStateRows metric.
|
|||||||||||||
Returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the "young" rows in the state store (one by one, i.e. every
Returns In
In |
doExecute
reports a UnsupportedOperationException
when executed with an invalid output mode.
Invalid output mode: [outputMode]
StateStoreSaveExec
takes the following when created:
-
Catalyst expressions for keys (as used for aggregation in groupBy operator)
-
Optional StatefulOperatorStateInfo