Skip to content

Commit

Permalink
Implements a KafkaConsumerResource (monix#240)
Browse files Browse the repository at this point in the history
Trigger ci

1
Remove printline


Update scala version build
Refinement


Bring back serialization test
  • Loading branch information
paualarco committed Oct 3, 2021
1 parent 0d82565 commit 5e238d1
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 41 deletions.
1 change: 0 additions & 1 deletion .bsp/sbt.json

This file was deleted.

45 changes: 45 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: build

on: [push, pull_request]

jobs:

tests:
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests
runs-on: ubuntu-latest

strategy:
fail-fast: true
matrix:
java: [8]
scala: [2.11.12, 2.12.15]

steps:
- uses: actions/checkout@v2
- uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.${{ matrix.java }}"

- name: Cache SBT Coursier directory
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }}
restore-keys: |
${{ runner.os }}-coursier-
- name: Cache SBT directory
uses: actions/cache@v1
with:
path: ~/.sbt
key: |
${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }}
restore-keys: ${{ runner.os }}-sbt-

- name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka10/test

- name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka11/test

- name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka1x/test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ project/plugins/project/
.scala_dependencies
.worksheet
.idea

.bsp/
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ lazy val commonDependencies = Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3" % "test",
"org.scalatest" %% "scalatest" % "3.0.9" % "test",
"org.scalacheck" %% "scalacheck" % "1.15.2" % "test",
"io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1" force()
"io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test force()
),
dependencyOverrides += "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test
)

lazy val monixKafka = project.in(file("."))
Expand Down
2 changes: 1 addition & 1 deletion kafka-0.11.x/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</encoder>
</appender>

<root level="WARN">
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsume
import scala.concurrent.blocking
import scala.util.matching.Regex

/** Exposes an `Observable` that consumes a Kafka stream by
* means of a Kafka Consumer client.
/** Acquires and releases a [[KafkaConsumer]] within a [[Resource]]
* is exposed in form of [[KafkaConsumerObservable]], which consumes
* and emits records from the specified topic.
*
* In order to get initialized, it needs a configuration. See the
* [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`,
* In order to get initialized, it needs a configuration.
* @see the [[KafkaConsumerConfig]] needed and `monix/kafka/default.conf`,
* (in the resource files) that is exposing all default values.
*/
object KafkaConsumerResource {
Expand Down
2 changes: 1 addition & 1 deletion kafka-1.0.x/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</encoder>
</appender>

<root level="WARN">
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ class KafkaConsumerResourceSpec extends FunSuite with KafkaTestKit with ScalaChe

val consumerConf: KafkaConsumerConfig = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:6001"),
groupId = "failing-logic",
groupId = "monix-closeable-consumer-test",
autoOffsetReset = AutoOffsetReset.Earliest
)

val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:6001"),
clientId = "monix-kafka-producer-test"
clientId = "monix-closeable-consumer-test"
)

test("async commit fails when observable was already cancelled") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package monix.kafka

import cats.effect.Resource
import monix.eval.Task
import monix.kafka.config.AutoOffsetReset
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord
import org.scalatest.{FunSuite, Matchers}

import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global
import net.manub.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.scalacheck.Gen
Expand All @@ -30,43 +29,45 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe
commit <- Gen.oneOf(commitCallbacks)
} yield CommittableOffset(new TopicPartition("topic", partition), offset, commit)

private def logic(bootstrapServer: String, topic: String) = {
val kafkaConfig: KafkaConsumerConfig = KafkaConsumerConfig.default.copy(
bootstrapServers = List(bootstrapServer),
groupId = "failing-logic",
autoOffsetReset = AutoOffsetReset.Earliest
)
KafkaConsumerObservable
.manualCommit[String, String](kafkaConfig, List(topic))
.timeoutOnSlowUpstreamTo(6.seconds, Observable.empty)
.foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) }
.map{completeBatch =>
{Task.unit >> Task.sleep(3.second) >> Task.evalAsync(println("Committing async!!!")) >> completeBatch.commitAsync()}.runSyncUnsafe()
}
.headL
}
test("merge by commit callback works") {
forAll(Gen.nonEmptyListOf(committableOffsetsGen)) { offsets =>
val partitions = offsets.map(_.topicPartition)
val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets)

test("async commit finalizes successfully after cancellation") {
EmbeddedKafka.start()
val batchSize = 10
received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys }

val topicName = "random_topic"
received.size should be <= 4
}
}

test("merge by commit callback for multiple consumers") {
withRunningKafka {
val count = 10000
val topicName = "monix-kafka-merge-by-commit"

val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:6001"),
clientId = "monix-kafka-producer-test"
clientId = "monix-kafka-1-0-producer-test"
)

val t = for {
_ <- Resource.liftF(Task(KafkaProducer[String, String](producerCfg, io))).use { producer =>
Task(producer.send(new ProducerRecord(topicName, "message1"))) >>
Task(producer.send(new ProducerRecord(topicName, "message2")))
}
_ <- logic("127.0.0.1:6001", topicName)
} yield ()
t.runSyncUnsafe()
val producer = KafkaProducerSink[String, String](producerCfg, io)

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", msg.toString))
.bufferIntrospective(1024)
.consumeWith(producer)

val listT = Observable
.range(0, 4)
.mergeMap(i => createConsumer(i.toInt, topicName).take(500))
.bufferTumbling(2000)
.map(CommittableOffsetBatch.mergeByCommitCallback)
.map { offsetBatches => assert(offsetBatches.length == 4) }
.completedL

EmbeddedKafka.stop()
Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds)
}
}

private def createConsumer(i: Int, topicName: String): Observable[CommittableOffset] = {
Expand Down
135 changes: 135 additions & 0 deletions kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package monix.kafka

import java.util

import monix.eval.Task
import monix.kafka.config.AutoOffsetReset
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord
import org.scalatest.FunSuite
import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer}
import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer}

import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException

class SerializationTest extends FunSuite with KafkaTestKit {

val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:6001"),
clientId = "monix-kafka-1-0-serialization-test"
)

val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:6001"),
groupId = "kafka-tests",
clientId = "monix-kafka-1-0-serialization-test",
autoOffsetReset = AutoOffsetReset.Earliest
)

test("serialization/deserialization using kafka.common.serialization") {
withRunningKafka {
val topicName = "monix-kafka-serialization-tests"
val count = 10000

implicit val serializer: KafkaSerializer[A] = new ASerializer
implicit val deserializer: KafkaDeserializer[A] = new ADeserializer

val producer = KafkaProducerSink[String, A](producerCfg, io)
val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count)

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", A(msg.toString)))
.bufferIntrospective(1024)
.consumeWith(producer)

val listT = consumer
.map(_.value())
.toListL

val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds)
assert(result.map(_.value.toInt).sum === (0 until count).sum)
}
}

test("allow to fail the stream if serialization throws") {
withRunningKafka {
val topicName = "monix-kafka-serialization-failing-tests"
val dummy = DummyException("boom")

implicit val serializer: KafkaSerializer[A] = new AFailingSerializer

val producer = KafkaProducerSink[String, A](producerCfg, io, (_: Throwable) => Task.raiseError(dummy))

val pushT = Observable
.evalOnce(new ProducerRecord(topicName, "obs", A(1.toString)))
.bufferIntrospective(1024)
.consumeWith(producer)

assertThrows[DummyException] {
Await.result(pushT.runToFuture, 60.seconds)
}
}
}

test("allow to recover from serialization errors") {
withRunningKafka {
val topicName = "monix-kafka-serialization-continuing-tests"
val count = 100

implicit val serializer: KafkaSerializer[A] = new AHalfFailingSerializer
implicit val deserializer: KafkaDeserializer[A] = new ADeserializer

val producer = KafkaProducerSink[String, A](producerCfg, io)
val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count / 2)

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", A(msg.toString)))
.bufferIntrospective(1024)
.consumeWith(producer)

val listT = consumer
.map(_.value())
.toListL

val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds)
assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum)
}
}

}

case class A(value: String)

class ASerializer extends KafkaSerializer[A] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

override def serialize(topic: String, data: A): Array[Byte] =
if (data == null) null else data.value.getBytes

override def close(): Unit = ()
}

class ADeserializer extends KafkaDeserializer[A] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

override def deserialize(topic: String, data: Array[Byte]): A = if (data == null) null else A(new String(data))

override def close(): Unit = ()
}

class AFailingSerializer extends ASerializer {
override def serialize(topic: String, data: A): Array[Byte] = throw new RuntimeException("fail")
}

class AHalfFailingSerializer extends ASerializer {

override def serialize(topic: String, data: A): Array[Byte] = {
if (data.value.toInt % 2 == 0) super.serialize(topic, data)
else throw new RuntimeException("fail")
}
}

0 comments on commit 5e238d1

Please sign in to comment.