This example shows how to use Apache Beam and SCIO to read objects from a Kafka topic, and serialize them encoded as Avro files in Google Cloud Storage.
This example contains two Dataflow pipelines:
- Object2Kafka: generates a set of objects and write them to Kafka. This is a batch mode pipeline.
- Kafka2Avro: reads objects from Kafka, convert them to Avro, and write the output to Google Cloud Storage. This is a streaming mode pipeline
Before compiling and generating your package, you need to change some options in
src/main/resources/application.conf
:
broker
: String with the address of the Kafka brokers.dest-bucket
: The name of the bucket where the Avro files will be writtendest-path
: The directories structure where the Avro files will be written (e.g. blank to write in the top level dir in the bucket, or anything like a/b/c).kafka-topic
: The name of the topic where the objects are written to, or read from.num-demo-objects
: Number of objects that will be generated by the Object2Kafka pipeline, these objects can be read with the Kafka2Avro pipeline to test that everything is working as expected.
The configuration file follows the HOCON format.
Here is a sample configuration file with all the options set:
broker = "1.2.3.4:9092"
dest-bucket = "my-bucket-in-gcs"
dest-path = "persisted/fromkafka/avro/"
kafka-topic = "my_kafka_topic"
num-demo-objects = 500 # comments are allowed in the config file
This example is written in Scala and uses SBT as build tool.
You need to have SBT >= 1.0 installed. You can download SBT from https://www.scala-sbt.org/
The Scala version is 2.12.8. If you have the JDK > 1.8 installed, SBT should automatically download the Scala compiler.
Run sbt
in the top sources folder.
Inside sbt, download all the dependencies:
sbt:kafka2avro> update
and then compile
sbt:kafka2avro> compile
If you have managed to compile the code, you can generate a JAR package to be deployed on Dataflow, with:
sbt:kafka2avro> pack
This will generate a set of JAR files in target/pack/lib
This is batch pipeline, provided just an example to populate Kafka and test the streaming pipeline.
Once you have generated the JAR file using the pack
command inside SBT, you
can now launch the job in Dataflow to populate Kafka with some demo
objects. Using Java 1.8, run the following command. Notice that you have to set
the project id, and a location in a GCS bucket to store the JARs imported by Dataflow:
CLASSPATH="target/pack/lib/*" java com.google.cloud.pso.kafka2avro.Object2Kafka --exec.mainClass=com.google.cloud.pso.kafka2avro.Object2Kafka --project=YOUR_PROJECT_ID --stagingLocation="gs://YOUR_BUCKET/YOUR_STAGING_LOCATION" --runner=DataflowRunner
This is a streaming pipeline. It will keep running unless you cancel it. The
default windowing policy is to group messages every 2 minutes, in a fixed
window. To change the policy, please see
the function windowIn
in Kafka2Avro.scala
.
Once you have generated the JAR file using the pack
command inside SBT, you
can now launch the job in Dataflow to populate Kafka with some demo
objects. Using Java 1.8, run the following command. Notice that you have to set
the project id, and a location in a GCS bucket to store the JARs imported by
Dataflow:
CLASSPATH="target/pack/lib/*" java com.google.cloud.pso.kafka2avro.Kafka2Avro --exec.mainClass=com.google.cloud.pso.kafka2avro.Kafka2Avro --project=YOUR_PROJECT_ID --stagingLocation="gs://YOUR_BUCKET/YOUR_STAGING_LOCATION" --runner=DataflowRunner
Please remember that the machine running the JAR may need to have connectivity to the Kafka cluster in order to retrieve some metadata, prior to launching the pipeline in Dataflow.
Remember that this is a streaming pipeline, it will keep running forever until you cancel or stop it.
In some cases, some dependencies may be downloaded with wrong filenames. For instance, containing symbols that need to be escaped. Importing these JARs in the job in Dataflow will fail.
If when running your Dataflow job, it fails before it is launched because it cannot copy some dependencies, change the name of the offending files so they don't contain symbols. For instance:
mv target/pack/lib/netty-codec-http2-\[4.1.25.Final,4.1.25.Final\].jar target/pack/lib/netty-codec-http2.jar
This example includes a configuration file for Cloud Build, so you can use it to run the unit tests with every commit done to your repository. To use this configuration file:
- Add your sources to a Git repository (either in Bitbucket, Github or Google Cloud Source).
- Configure a trigger in Google Cloud Build linked to your Git repository.
- Set the path for the configuration file to
cloudbuild.yaml
.
The included configuration file will do the following steps:
- Download a cache for Ivy2 from a Google Cloud Storage bucket named
YOURPROJECT_cache, where
YOURPROJECT
is your GCP project id. - Compile and test the Scala code.
- Generate a package.
- Upload the new Ivy2 cache to the same bucket as in the first step.
- Upload the generated package and all its dependencies to a bucket named
YOURPROJECT_pkgs, where
YOURPROJECT
is your GCP project id.
So these default steps will try to write to and read from two different buckets in Google Cloud Storage. Please either create these buckets in your GCP project, or change the configuration.
Please note that you need to build and include the scala-sbt
Cloud Builder in
order to use this configuration file.
- Make sure you have the Google Cloud SDK configured with your credentials and project
- Download the sources from GoogleCloudPlatform/cloud-builders-community/tree/master/scala-sbt
- And then in the
scala-sbt
sources dir, rungcloud builds submit . --config=cloudbuild.yaml
to add the builder to your GCP project. You only need to do this once.