Skip to content

Latest commit

 

History

History
97 lines (66 loc) · 4.94 KB

spark-sql-streaming-WatermarkSupport.adoc

File metadata and controls

97 lines (66 loc) · 4.94 KB

WatermarkSupport Contract for Streaming Watermark in Unary Physical Operators

WatermarkSupport is the contract for unary physical operators (i.e. UnaryExecNode) with streaming watermark support.

Note

Watermark (aka "allowed lateness") is a moving threshold of event time and specifies what data to consider for aggregations, i.e. the threshold of late data so the engine can automatically drop incoming late data given event time and clean up old state accordingly.

Read the official documentation of Spark in Handling Late Data and Watermarking.

Table 1. WatermarkSupport’s (Lazily-Initialized) Properties
Property Description

watermarkExpression

Optional Catalyst expression that matches rows older than the event time watermark.

Note
Use withWatermark operator to specify streaming watermark.

When initialized, watermarkExpression finds spark.watermarkDelayMs watermark attribute in the child output’s metadata.

If found, watermarkExpression creates evictionExpression with the watermark attribute that is less than or equal eventTimeWatermark.

The watermark attribute may be of type StructType. If it is, watermarkExpression uses the first field as the watermark.

watermarkExpression prints out the following INFO message to the logs when spark.watermarkDelayMs watermark attribute is found.

INFO [physicalOperator]Exec: Filtering state store on: [evictionExpression]
Tip
Enable INFO logging level for one of the stateful physical operators to see the INFO message in the logs.

watermarkPredicateForData

Optional Predicate that uses watermarkExpression and the child output to match rows older than the watermark.

watermarkPredicateForKeys

Optional Predicate that uses keyExpressions to match rows older than the event time watermark.

WatermarkSupport Contract

package org.apache.spark.sql.execution.streaming

trait WatermarkSupport extends UnaryExecNode {
  // only required methods that have no implementation
  def eventTimeWatermark: Option[Long]
  def keyExpressions: Seq[Attribute]
}
Table 2. WatermarkSupport Contract
Method Description

eventTimeWatermark

Used mainly in watermarkExpression to create a LessThanOrEqual Catalyst binary expression that matches rows older than the watermark.

keyExpressions

Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata

Used in watermarkPredicateForKeys to create a Predicate to match rows older than the event time watermark.

Used also when StateStoreSaveExec and StreamingDeduplicateExec physical operators are executed.

Removing Keys Older Than Watermark From StateStore — removeKeysOlderThanWatermark Internal Method

removeKeysOlderThanWatermark(store: StateStore): Unit

removeKeysOlderThanWatermark requests the input store for all rows.

removeKeysOlderThanWatermark then uses watermarkPredicateForKeys to remove matching rows from the store.

Note

removeKeysOlderThanWatermark is used when: