diff --git a/pom.xml b/pom.xml
index eb21ba5..f450d01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,7 +88,7 @@
no.shhsoft
k3a-embedded
- 0.3.2+${kafka.version}
+ 0.4.0+${kafka.version}
test
diff --git a/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java b/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java
index 337c499..410be71 100644
--- a/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java
+++ b/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java
@@ -1,6 +1,7 @@
package no.statnett.k3alagexporter.itest;
import no.shhsoft.k3aembedded.K3aEmbedded;
+import no.shhsoft.k3aembedded.K3aTestUtils;
import no.statnett.k3alagexporter.ClusterLagCollector;
import no.statnett.k3alagexporter.model.ClusterData;
import no.statnett.k3alagexporter.model.ConsumerGroupData;
@@ -13,12 +14,9 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -57,8 +55,8 @@ public void shouldDetectLag() {
lagCollector = new ClusterLagCollector(CLUSTER_NAME,
null, null, null, null,
getMinimalConsumerConfig(), getMinimalAdminConfig());
- try (final Producer producer = getProducer()) {
- try (final Consumer consumer = getConsumer(CONSUMER_GROUP_ID)) {
+ try (final Producer producer = getProducer()) {
+ try (final Consumer consumer = getConsumer(CONSUMER_GROUP_ID)) {
consumer.subscribe(Collections.singleton(TOPIC));
produce(producer);
int consumedValue = consume(consumer);
@@ -87,8 +85,8 @@ private void assertLag(final int expected) {
assertEquals(expected, consumerGroupData.getLag(), 0.00001);
}
- private void produce(final Producer producer) {
- final ProducerRecord record = new ProducerRecord<>(TOPIC, 0, ++lastProducedValue);
+ private void produce(final Producer producer) {
+ final ProducerRecord record = new ProducerRecord<>(TOPIC, 0, String.valueOf(++lastProducedValue));
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
@@ -102,36 +100,22 @@ private void produce(final Producer producer) {
producer.flush();
}
- private int consume(final Consumer consumer) {
+ private int consume(final Consumer consumer) {
int lastValue = -1;
- final ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
- for (final ConsumerRecord record : records) {
- lastValue = record.value();
+ final ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ for (final ConsumerRecord record : records) {
+ lastValue = Integer.valueOf(record.value());
consumer.commitAsync();
}
return lastValue;
}
- public Producer getProducer() {
- final Map map = getCommonConfig();
- map.put(ProducerConfig.ACKS_CONFIG, "all");
- map.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "3000");
- map.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000");
- map.put(ProducerConfig.LINGER_MS_CONFIG, "0");
- map.put(ProducerConfig.RETRIES_CONFIG, "0");
- map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- return new KafkaProducer<>(map);
+ public Producer getProducer() {
+ return new KafkaProducer<>(K3aTestUtils.producerProps(broker));
}
- public Consumer getConsumer(final String consumerGroupId) {
- final Map map = getCommonConfig();
- map.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
- map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- return new KafkaConsumer<>(map);
+ public Consumer getConsumer(final String consumerGroupId) {
+ return new KafkaConsumer<>(K3aTestUtils.consumerProps(consumerGroupId, false, broker));
}
public Map getMinimalAdminConfig() {