Skip to content

Latest commit

 

History

History

spark-connector-examples

Spark Connector Examples for Pravega

Battery of code examples to demonstrate the capabilities of Pravega as a data stream storage system for Apache Spark.

Getting Started

Install Operating System

Install Ubuntu 18.04 LTS. Other operating systems can also be used but the commands below have only been tested on this version.

Install Java 11

sudo apt-get install openjdk-11-jdk

You may have multiple versions of Java installed. Ensure that Java 11 is the default with the command below.

sudo update-alternatives --config java

Run Pravega

This will run a development instance of Pravega locally.

cd
git clone https://github.com/pravega/pravega
cd pravega
git checkout r0.9
./gradlew startStandalone

Install Apache Spark

This will install a development instance of Spark locally.

Download https://www.apache.org/dyn/closer.lua/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz.

mkdir -p ~/spark
cd ~/spark
tar -xzvf ~/Downloads/spark-3.0.2-bin-hadoop2.7.tgz
ln -s spark-3.0.2-bin-hadoop2.7 current
export PATH="$HOME/spark/current/bin:$PATH"

By default, the script run_spark_ap.sh will use an in-process Spark mini-cluster that is started with the Spark job (--master local[2]).

Run Examples

Run a PySpark batch job that reads events from the file sample_data.json and writes to a Pravega stream

./run_pyspark_app.sh src/main/python/batch_file_to_pravega.py

The file sample_data.json has the following conents:

{"id": 1, "key": "a", "message": "Hello world."}
{"id": 2, "key": "b", "message": "Welcome to Pravega!"}
{"id": 3, "key": "a", "message": "Another message."}
{"id": 4, "key": "c", "message": "Another message."}
{"id": 5, "key": "c", "message": "Another message."}

You should see output similar to the following:

+--------------------------------------------------+-----------+
|event                                             |routing_key|
+--------------------------------------------------+-----------+
|{"id":1,"key":"a","message":"Hello world."}       |a          |
|{"id":2,"key":"b","message":"Welcome to Pravega!"}|b          |
|{"id":3,"key":"a","message":"Another message."}   |a          |
|{"id":4,"key":"c","message":"Another message."}   |c          |
|{"id":5,"key":"c","message":"Another message."}   |c          |
+--------------------------------------------------+-----------+

Run a PySpark batch job that reads from a Pravega stream and writes to the console

./run_pyspark_app.sh src/main/python/batch_pravega_to_console.py

You should see output similar to the following:

+--------------------------------------------------+--------+------------+----------+------+
|event                                             |scope   |stream      |segment_id|offset|
+--------------------------------------------------+--------+------------+----------+------+
|{"id":4,"key":"c","message":"Another message."}   |examples|batchstream1|1         |0     |
|{"id":5,"key":"c","message":"Another message."}   |examples|batchstream1|1         |55    |
|{"id":1,"key":"a","message":"Hello world."}       |examples|batchstream1|4         |0     |
|{"id":2,"key":"b","message":"Welcome to Pravega!"}|examples|batchstream1|4         |51    |
|{"id":3,"key":"a","message":"Another message."}   |examples|batchstream1|4         |109   |
+--------------------------------------------------+--------+------------+----------+------+

Run a PySpark Streaming job that writes generated data to a Pravega stream

./run_pyspark_app.sh src/main/python/stream_generated_data_to_pravega.py

This job will continue to run and write events until stopped.

Run a PySpark Streaming job that reads from a Pravega stream and writes to the console

rm -rf /tmp/spark-checkpoints-stream_pravega_to_console
./run_pyspark_app.sh src/main/python/stream_pravega_to_console.py

This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.

You should see output similar to the following every 3 seconds:

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------------+--------+-----------------+----------+------+
|event                  |scope   |stream           |segment_id|offset|
+-----------------------+--------+-----------------+----------+------+
|2019-05-23 14:40:08.329|examples|streamprocessing1|0         |181753|
|2019-05-23 14:40:10.329|examples|streamprocessing1|0         |181784|
|2019-05-23 14:40:09.329|examples|streamprocessing1|0         |181815|
+-----------------------+--------+-----------------+----------+------+

Run a PySpark Streaming job that reads from a Pravega stream and writes to another Pravega stream

rm -rf /tmp/spark-checkpoints-stream_pravega_to_pravega
./run_spark_app.sh src/main/python/stream_pravega_to_pravega.py

This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.

Run a Java Spark Streaming job that reads from a Pravega stream and writes to the console

rm -rf /tmp/spark-checkpoints-StreamPravegaToConsole
./run_java_spark_app.sh io.pravega.example.spark.StreamPravegaToConsole

This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.

You should see output similar to the following every 3 seconds:

+-----------------------+--------+-----------------+----------+------+
|event                  |scope   |stream           |segment_id|offset|
+-----------------------+--------+-----------------+----------+------+
|2019-05-23 14:45:59.329|examples|streamprocessing1|0         |192634|
|2019-05-23 14:46:01.329|examples|streamprocessing1|0         |192665|
|2019-05-23 14:46:00.329|examples|streamprocessing1|0         |192696|
+-----------------------+--------+-----------------+----------+------+

If you press Control-C to abort the job, and then restart it, you will see that it will start exactly where it left off. This will be apparent because the batch number will not start at 0. This recovery information stored in the checkpoint directory. If you delete the checkpoint directory, or specify an empty directory, it will start from the earliest event since the option start_stream_cut is set to earliest. If you make significant changes to your Spark application, Spark may encounter an error when it tries to load the checkpoint. In this case, you will need to delete the checkpoint directory.

Run a PySpark Streaming job in a Spark Cluster

Start a separate Spark server process.

~/spark/current/sbin/start-all.sh

Confirm that you can browse to the Spark Master UI at http://localhost:8080/.

Submit the job.

USE_IN_PROCESS_SPARK=0 ./run_pyspark_app.sh src/main/python/stream_generated_data_to_pravega.py
USE_IN_PROCESS_SPARK=0 ./run_java_spark_app.sh io.pravega.example.spark.StreamPravegaToConsole

(Optional) Configure Anaconda Python

These steps show how to use Anaconda Python to run PySpark applications. Anaconda Python is configured with Numpy, Pandas, and TensorFlow.

Install Conda

See https://www.anaconda.com/rpm-and-debian-repositories-for-miniconda/.

Create Conda Environment

source /opt/conda/etc/profile.d/conda.sh
src/main/python/create_conda_env.sh
conda activate pravega-samples

Run the following before executing spark-submit.

export PYSPARK_PYTHON=$HOME/.conda/envs/pravega-samples/bin/python