Skip to content

Latest commit

 

History

History
161 lines (122 loc) · 4.77 KB

spark-sql-streaming-MemoryStream.adoc

File metadata and controls

161 lines (122 loc) · 4.77 KB

MemoryStream

MemoryStream is a streaming Source that produces values to memory.

MemoryStream uses the internal batches collection of datasets.

Caution

This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.

MemoryStream is designed primarily for unit tests, tutorials and debugging.

val spark: SparkSession = ???

implicit val ctx = spark.sqlContext

import org.apache.spark.sql.execution.streaming.MemoryStream
// It uses two implicits: Encoder[Int] and SQLContext
val intsIn = MemoryStream[Int]

val ints = intsIn.toDF
  .withColumn("t", current_timestamp())
  .withWatermark("t", "5 minutes")
  .groupBy(window($"t", "5 minutes") as "window")
  .agg(count("*") as "total")

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val totalsOver5mins = ints.
  writeStream.
  format("memory").
  queryName("totalsOver5mins").
  outputMode(OutputMode.Append).
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

scala> val zeroOffset = intsIn.addData(0, 1, 2)
zeroOffset: org.apache.spark.sql.execution.streaming.Offset = #0

totalsOver5mins.processAllAvailable()
spark.table("totalsOver5mins").show

scala> intsOut.show
+-----+
|value|
+-----+
|    0|
|    1|
|    2|
+-----+

memoryQuery.stop()
17/02/28 20:06:01 DEBUG StreamExecution: Starting Trigger Calculation
17/02/28 20:06:01 DEBUG StreamExecution: getOffset took 0 ms
17/02/28 20:06:01 DEBUG StreamExecution: triggerExecution took 0 ms
17/02/28 20:06:01 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z))
17/02/28 20:06:01 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec5addda-0e46-4c3c-b2c2-604a854ee19a",
  "runId" : "d850cabc-94d0-4931-8a2d-e054086e39c3",
  "name" : "totalsOver5mins",
  "timestamp" : "2017-02-28T19:06:01.175Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MemoryStream[value#1]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
Tip

Enable DEBUG logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemoryStream=DEBUG

Refer to Logging.

Creating MemoryStream Instance

apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A]

MemoryStream object defines apply method that you can use to create instances of MemoryStream streaming sources.

Adding Data to Source (addData methods)

addData(data: A*): Offset
addData(data: TraversableOnce[A]): Offset

addData methods add the input data to batches internal collection.

When executed, addData adds a DataFrame (created using toDS implicit method) and increments the internal currentOffset offset.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: Adding ds: [ds]

Generating Next Streaming Batch — getBatch Method

Note
getBatch is a part of Streaming Source contract.

When executed, getBatch uses the internal batches collection to return requested offsets.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks]

StreamingExecutionRelation Logical Plan

MemoryStream uses StreamingExecutionRelation logical plan to build Datasets or DataFrames when requested.

scala> val ints = MemoryStream[Int]
ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13]

scala> ints.toDS.queryExecution.logical.isStreaming
res14: Boolean = true

scala> ints.toDS.queryExecution.logical
res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13]

Schema (schema method)

MemoryStream works with the data of the schema as described by the Encoder (of the Dataset).