Skip to content

Latest commit

 

History

History
272 lines (193 loc) · 7.61 KB

spark-sql-streaming-DataStreamWriter.adoc

File metadata and controls

272 lines (193 loc) · 7.61 KB

DataStreamWriter — Writing Datasets To Streaming Data Sinks

DataStreamWriter is the interface to describe how the result of executing (batches of) a streaming query is written to a streaming sink.

Note

A streaming query is a Dataset with a streaming logical plan.

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
import org.apache.spark.sql.DataFrame
val rates: DataFrame = spark.
  readStream.
  format("rate").
  load

scala> rates.isStreaming
res1: Boolean = true

scala> rates.queryExecution.logical.isStreaming
res2: Boolean = true

DataStreamWriter is available using writeStream method of a streaming Dataset.

import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row

val streamingQuery: Dataset[Long] = ...

scala> streamingQuery.isStreaming
res0: Boolean = true

val writer: DataStreamWriter[Row] = streamingQuery.writeStream

Like the batch DataFrameWriter, DataStreamWriter has a direct support for many file formats and an extension point to plug in new formats.

// see above for writer definition

// Save dataset in JSON format
writer.format("json")

In the end, you start the actual continuous writing of the result of executing a Dataset to a sink using start operator.

writer.save

Beside the above operators, there are the following to work with a Dataset as a whole.

Table 1. DataStreamWriter’s Methods
Method Description

format

Specifies the format of the output (which is an output data source and indirectly specifies the streaming sink to write the rows to)

Internally, format is referred to as a source (as in the output data source).

Recognized "special" output data sources (in the code):

foreach

Sets ForeachWriter in the full control of streaming writes.

option

options

outputMode

Specifies the output mode

partitionBy

queryName

Assigns the name of a query

trigger

Sets the Trigger for how often a streaming query should be executed and the result saved.

Note
hive is not supported for streaming writing (and leads to a AnalysisException).
Note
DataFrameWriter is responsible for writing in a batch fashion.
Table 2. DataStreamWriter’s Internal Properties (in alphabetical order)
Name Initial Value Description

extraOptions

foreachWriter

partitioningColumns

source

outputMode

OutputMode.Append

OutputMode of the streaming sink

Set using outputMode method.

trigger

Specifying Write Option — option Method

option(key: String, value: String): DataStreamWriter[T]
option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]

Internally, option adds the key and value to extraOptions internal option registry.

Specifying Output Mode — outputMode Method

outputMode(outputMode: String): DataStreamWriter[T]
outputMode(outputMode: OutputMode): DataStreamWriter[T]

outputMode specifies the output mode of a streaming Dataset.

Note
When unspecified explicitly, Append output mode is the default.

outputMode can be a name or typed OutputMode.

Note
Output mode describes what data is written to a streaming sink when there is new data available in streaming data sources.

Setting Query Name — queryName method

queryName(queryName: String): DataStreamWriter[T]

queryName sets the name of a streaming query.

Internally, it is just an additional option with the key queryName.

Setting How Often to Execute Streaming Query — trigger method

trigger(trigger: Trigger): DataStreamWriter[T]

trigger method sets the time interval of the trigger (that executes a batch runner) for a streaming query.

Note
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.

The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.

Tip
Consult Trigger to learn about Trigger and ProcessingTime types.

Starting Continuous Writing to Sink — start Method

start(): StreamingQuery
start(path: String): StreamingQuery  // (1)
  1. Sets path option to path and passes the call on to start()

start starts a streaming query.

start gives a StreamingQuery to control the execution of the continuous query.

Note
Whether or not you have to specify path option depends on the streaming sink in use.

Internally, start branches off per source.

  • memory

  • foreach

  • other formats

…​FIXME

Table 3. start’s Options
Option Description

queryName

Name of active streaming query

checkpointLocation

Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.)

start reports a AnalysisException when source is hive.

val q =  spark.
  readStream.
  text("server-logs/*").
  writeStream.
  format("hive") <-- hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
  ... 48 elided
Note
Define options using option or options methods.

Making ForeachWriter in Charge of Streaming Writes — foreach method

foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

foreach sets the input ForeachWriter to be in control of streaming writes.

Internally, foreach sets the streaming output format as foreach and foreachWriter as the input writer.

Note
foreach uses SparkSession to access SparkContext to clean the ForeachWriter.
Note

foreach reports an IllegalArgumentException when writer is null.

foreach writer cannot be null