Skip to content

Commit

Permalink
[kafka-to-elasticsearch] Migrate akka dependencies to pekko (#18)
Browse files Browse the repository at this point in the history
* [kafka-to-elasticsearch] Migrate akka dependencies to pekko

* [kafka-to-elasticsearch] Migrate akka dependencies to pekko

* use confluentinc/cp-kafka:5.4.1

* try to fix doc build

---------

Co-authored-by: laglang <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
  • Loading branch information
3 people authored Dec 17, 2023
1 parent 4f96138 commit adec5c6
Show file tree
Hide file tree
Showing 24 changed files with 283 additions and 286 deletions.
40 changes: 20 additions & 20 deletions docs/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ FtpToFile / paradoxProperties ++= Map(
// Pekko Connectors
"scaladoc.org.apache.pekko.stream.connectors.base_url" -> s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}",
"javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
"extref.org.apache.pekko.stream.connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}/%s",
"extref.pekko-connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}/%s",
// Pekko
"scaladoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/${Dependencies.FtpToFile.PekkoVersion}",
"javadoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/japi/pekko/${Dependencies.FtpToFile.PekkoVersion}",
"extref.org.apache.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.FtpToFile.PekkoVersion}/%s",
"extref.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.FtpToFile.PekkoVersion}/%s",
)
FtpToFile / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))

Expand All @@ -52,9 +52,9 @@ HttpCsvToKafka / paradoxProperties ++= Map(
"javadoc.akka.base_url" -> "",
"extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.HttpCsvToKafka.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}",
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
"extref.alpakka-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}/%s",
"extref.pekko-connectors-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.HttpCsvToKafka.PekkoVersion}",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.HttpCsvToKafka.PekkoVersion}",
Expand Down Expand Up @@ -126,17 +126,17 @@ KafkaToElasticsearch / paradoxProperties ++= Map(
"snip.build.base_dir" -> s"${baseDirectory.value}/../pekko-connectors-sample-${KafkaToElasticsearch.name}",
"github.root.base_dir" -> s"${baseDirectory.value}/..",
// Pekko Connectors
"scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}",
"javadoc.akka.base_url" -> "",
"extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}/%s",
"scaladoc.org.apache.pekko.stream.connectors.base_url" -> s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}",
"javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
"extref.pekko-connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToElasticsearch.AlpakkaKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
"extref.alpakka-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToElasticsearch.AlpakkaKafkaVersion}/%s",
"scaladoc.org.apache.pekko.kafka.base_url" -> s"https://pekko.apache.org/api/alpakka-kafka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsKafkaVersion}",
"javadoc.org.apache.pekko.kafka.base_url" -> "",
"extref.pekko-connectors-kafka.base_url" -> s"https://pekko.apache.org/docs/alpakka-kafka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
"extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}/%s",
"scaladoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
"javadoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/japi/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
"extref.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}/%s",
)
KafkaToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))

Expand All @@ -156,9 +156,9 @@ KafkaToWebsocketClients / paradoxProperties ++= Map(
// "javadoc.akka.base_url" -> "",
// "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.AlpakkaKafkaVersion}",
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
"extref.alpakka-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.AlpakkaKafkaVersion}/%s",
"extref.pekko-connectors-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.KafkaToWebsocketClients.PekkoVersion}",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.KafkaToWebsocketClients.PekkoVersion}",
Expand Down Expand Up @@ -186,9 +186,9 @@ MqttToKafka / paradoxProperties ++= Map(
"javadoc.akka.base_url" -> "",
"extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.MqttToKafka.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.MqttToKafka.AlpakkaKafkaVersion}",
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.MqttToKafka.PekkoConnectorsKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
"extref.alpakka-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.MqttToKafka.AlpakkaKafkaVersion}/%s",
"extref.pekko-connectors-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.MqttToKafka.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.MqttToKafka.PekkoVersion}",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.MqttToKafka.PekkoVersion}",
Expand All @@ -211,11 +211,11 @@ FileToElasticsearch / paradoxProperties ++= Map(
// Pekko Connectors
"scaladoc.org.apache.pekko.stream.connectors.base_url" -> s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}",
"javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
"extref.org.apache.pekko.stream.connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}/%s",
"extref.pekko-connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}/%s",
// Pekko
"scaladoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}",
"javadoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/japi/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}",
"extref.org.apache.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}/%s",
"extref.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}/%s",
)
FileToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))

Expand Down Expand Up @@ -246,7 +246,7 @@ Paradox / siteSubdirName := ""
paradoxProperties ++= Map(
"extref.akka.base_url" -> "https://doc.akka.io/docs/akka/current/",
"extref.alpakka.base_url" -> "https://doc.akka.io/docs/alpakka/current/",
"extref.alpakka-kafka.base_url" -> "https://doc.akka.io/docs/alpakka-kafka/current/",
"extref.pekko-connectors-kafka.base_url" -> "https://doc.akka.io/docs/alpakka-kafka/current/",
"extref.ftp-to-file.base_url" -> s"${(FtpToFile / siteSubdirName).value}/",
"extref.http-csv-to-kafka.base_url" -> s"${(HttpCsvToKafka / siteSubdirName).value}/",
"extref.jdbc-to-elasticsearch.base_url" -> s"${(JdbcToElasticsearch / siteSubdirName).value}/",
Expand Down
8 changes: 4 additions & 4 deletions docs/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object Dependencies {
val PekkoVersion = versions("PekkoVersion")
val AkkaHttpVersion = versions("AkkaHttpVersion")
val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}

object JdbcToElasticsearch {
Expand Down Expand Up @@ -75,7 +75,7 @@ object Dependencies {
val ScalaVersion = versions("scalaVer")
val PekkoVersion = versions("PekkoVersion")
val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}

object KafkaToWebsocketClients {
Expand All @@ -90,7 +90,7 @@ object Dependencies {
val ScalaVersion = versions("scalaVer")
val PekkoVersion = versions("PekkoVersion")
val AkkaHttpVersion = versions("AkkaHttpVersion")
val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}

object MqttToKafka {
Expand All @@ -105,7 +105,7 @@ object Dependencies {
val ScalaVersion = versions("scalaVer")
val PekkoVersion = versions("PekkoVersion")
val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}

object FileToElasticsearch {
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This example uses Pekko Connectors File to watch for new files created in a dire
Read a stream of data and store it zipped in rotating files on an SFTP server.

### @extref:[Subscribe to MQTT and produce to Kafka](mqtt-to-kafka:index.html)
Subscribe to an MQTT topic with @extref[Pekko Connectors MQTT](pekko-connectors:/mqtt.html), group a few values and publish the aggregate to a Kafka topic with @extref[Pekko Connectors Kafka](alpakka-kafka:).
Subscribe to an MQTT topic with @extref[Pekko Connectors MQTT](pekko-connectors:/mqtt.html), group a few values and publish the aggregate to a Kafka topic with @extref[Pekko Connectors Kafka](pekko-connectors-kafka:).

### @link:[Amazon SQS](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-sqs-java) { open=new }
Listen to an Amazon SQS topic, enrich the message via calling an actor, publish a new message to SQS and acknowledge/delete the original message. (Java only)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ object Dependencies {
val PekkoVersion = "2.6.19"
val AkkaHttpVersion = "10.1.12"
val PekkoConnectorsVersion = "4.0.0"
val AlpakkaKafkaVersion = "3.0.1"
val PekkoConnectorsKafkaVersion = "3.0.1"

val dependencies = List(
"com.lightbend.akka" %% "akka-stream-alpakka-csv" % PekkoConnectorsVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
"com.typesafe.akka" %% "akka-stream" % PekkoVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

### Description

The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed into @extref:[Alpakka CSV](alpakka:data-transformations/csv.html) to be parsed and converted per line into a Map. The stream elements becomes a @scala[`Map[String, ByteString]`]@java[`Map<String, ByteString>`], one entry per column using the column headers as keys.
The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed into @extref:[Pekko-Connectors CSV](alpakka:data-transformations/csv.html) to be parsed and converted per line into a Map. The stream elements becomes a @scala[`Map[String, ByteString]`]@java[`Map<String, ByteString>`], one entry per column using the column headers as keys.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

[Testcontainers](https://www.testcontainers.org/) starts a Kafka broker in Docker.

@extref:[Alpakka Kafka](alpakka-kafka:producer.html) producer settings specify the broker address and the data types for Kafka's key and value.
@extref:[Pekko-Connectors Kafka](pekko-connectors-kafka:producer.html) producer settings specify the broker address and the data types for Kafka's key and value.

@scaladoc:[Producer.plainSink](akka.kafka.scaladsl.Producer$) sends the `ProducerRecord`s stream elements to the specified Kafka topic.
2 changes: 1 addition & 1 deletion pekko-connectors-sample-kafka-to-elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Read from a Kafka topic and publish to Elasticsearch

This example uses @extref[Alpakka Kafka](alpakka-kafka:) to subscribe to a Kafka topic, parses JSON into a data class and stores the object in Elasticsearch. After storing the Kafka offset is committed back to Kafka. This gives at-least-once semantics.
This example uses @extref[Pekko-Connectors Kafka](pekko-connectors-kafka:) to subscribe to a Kafka topic, parses JSON into a data class and stores the object in Elasticsearch. After storing the Kafka offset is committed back to Kafka. This gives at-least-once semantics.

Browse the sources at @link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-kafka-to-elasticsearch) { open=new }.

Expand Down
2 changes: 1 addition & 1 deletion pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
organization := "com.lightbend.akka"
organization := "org.apache.pekko"
version := "1.0.0"
scalaVersion := Dependencies.scalaVer
libraryDependencies ++= Dependencies.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,28 @@ import sbt._
object Dependencies {
val scalaVer = "2.13.12"
// #deps
val PekkoVersion = "2.6.19"
val PekkoConnectorsVersion = "4.0.0"
val AlpakkaKafkaVersion = "3.0.1"
val PekkoVersion = "1.0.2"
val PekkoConnectorsVersion = "1.0.1"
val PekkoConnectorsKafkaVersion = "1.0.0"

// #deps

val dependencies = List(
// #deps
"com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % PekkoConnectorsVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
"com.typesafe.akka" %% "akka-stream" % PekkoVersion,
"com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
"com.typesafe.akka" %% "akka-actor" % PekkoVersion,
// #deps
"org.apache.pekko" %% "pekko-connectors-elasticsearch" % PekkoConnectorsVersion,
"org.apache.pekko" %% "pekko-connectors-kafka" % PekkoConnectorsKafkaVersion,
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
"org.apache.pekko" %% "pekko-actor" % PekkoVersion,
// for JSON in Scala
"io.spray" %% "spray-json" % "1.3.6",
// for JSON in Java
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.3",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.13.3",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.14.3",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.14.3",
// Logging
"com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
"ch.qos.logback" % "logback-classic" % "1.2.13",
// #deps
"org.testcontainers" % "elasticsearch" % "1.17.3",
"org.testcontainers" % "kafka" % "1.17.3"
)
// #deps
"org.testcontainers" % "elasticsearch" % "1.17.6",
"org.testcontainers" % "kafka" % "1.17.6")
}
Loading

0 comments on commit adec5c6

Please sign in to comment.