Trigger
defines how frequently a streaming query should be executed and therefore emit a new data (which StreamExecution
uses to resolve a TriggerExecutor).
Note
|
A trigger can also be called a batch interval (as in the older Spark Streaming). |
Trigger | Creating Instance |
---|---|
|
|
|
|
|
|
|
Note
|
You specify the trigger for a streaming query using DataStreamWriter 's trigger method.
|
import org.apache.spark.sql.streaming.Trigger
val query = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.Once). // <-- execute once and stop
queryName("rate-once").
start
scala> query.isActive
res0: Boolean = false
scala> println(query.lastProgress)
{
"id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b",
"runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5",
"name" : "rate-once",
"timestamp" : "2017-07-04T18:39:35.998Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1365,
"getBatch" : 29,
"getOffset" : 0,
"queryPlanning" : 285,
"triggerExecution" : 1742,
"walCommit" : 40
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
"startOffset" : null,
"endOffset" : 0,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277"
}
}
Note
|
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException .
|
case object MyTrigger extends Trigger
scala> val query = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
option("truncate", false).
trigger(MyTrigger). // <-- use custom trigger
queryName("rate-once").
start
java.lang.IllegalStateException: Unknown type of trigger: MyTrigger
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:178)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
... 57 elided
Note
|
Trigger was introduced in the commit for [SPARK-14176][SQL] Add DataFrameWriter.trigger to set the stream batch period.
|
ProcessingTime
is a Trigger
that assumes that milliseconds is the minimum time unit.
You can create an instance of ProcessingTime
using the following constructors:
-
ProcessingTime(Long)
that accepts non-negative values that represent milliseconds.ProcessingTime(10)
-
ProcessingTime(interval: String)
orProcessingTime.create(interval: String)
that acceptCalendarInterval
instances with or without leadinginterval
string.ProcessingTime("10 milliseconds") ProcessingTime("interval 10 milliseconds")
-
ProcessingTime(Duration)
that acceptsscala.concurrent.duration.Duration
instances.ProcessingTime(10.seconds)
-
ProcessingTime.create(interval: Long, unit: TimeUnit)
forLong
andjava.util.concurrent.TimeUnit
instances.ProcessingTime.create(10, TimeUnit.SECONDS)