KafkaSink
is a streaming sink that KafkaSourceProvider registers as the kafka
format.
// start spark-shell or a Spark application with spark-sql-kafka-0-10 module
// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
spark.
readStream.
format("text").
load("server-logs/*.out").
as[String].
writeStream.
queryName("server-logs processor").
format("kafka"). // <-- uses KafkaSink
option("topic", "topic1").
option("checkpointLocation", "/tmp/kafka-sink-checkpoint"). // <-- mandatory
start
// in another terminal
$ echo hello > server-logs/hello.out
// in the terminal with Spark
FIXME
KafkaSink
takes the following when created:
addBatch(batchId: Long, data: DataFrame): Unit
Internally, addBatch
requests KafkaWriter
to write the input data
to the topic (if defined) or a topic in executorKafkaParams.
Note
|
addBatch is a part of Sink contract.
|