From 9f29c9def79cfaa2b6eadd4bfa1221d3160483ba Mon Sep 17 00:00:00 2001 From: "Sverre H. Huseby" Date: Tue, 24 Oct 2023 15:02:07 +0200 Subject: [PATCH 1/2] chore(deps): update k3a-embedded to 0.4.0 --- pom.xml | 2 +- .../itest/K3aLagExporterIT.java | 42 ++++++------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index eb21ba5..f450d01 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ no.shhsoft k3a-embedded - 0.3.2+${kafka.version} + 0.4.0+${kafka.version} test diff --git a/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java b/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java index 337c499..410be71 100644 --- a/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java +++ b/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java @@ -1,6 +1,7 @@ package no.statnett.k3alagexporter.itest; import no.shhsoft.k3aembedded.K3aEmbedded; +import no.shhsoft.k3aembedded.K3aTestUtils; import no.statnett.k3alagexporter.ClusterLagCollector; import no.statnett.k3alagexporter.model.ClusterData; import no.statnett.k3alagexporter.model.ConsumerGroupData; @@ -13,12 +14,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -57,8 +55,8 @@ public void shouldDetectLag() { lagCollector = new ClusterLagCollector(CLUSTER_NAME, null, null, null, null, getMinimalConsumerConfig(), getMinimalAdminConfig()); - try (final Producer producer = getProducer()) { - try (final Consumer consumer = getConsumer(CONSUMER_GROUP_ID)) { + try (final Producer producer = getProducer()) { + try (final Consumer consumer = getConsumer(CONSUMER_GROUP_ID)) { consumer.subscribe(Collections.singleton(TOPIC)); produce(producer); int consumedValue = consume(consumer); @@ -87,8 +85,8 @@ private void assertLag(final int expected) { assertEquals(expected, consumerGroupData.getLag(), 0.00001); } - private void produce(final Producer producer) { - final ProducerRecord record = new ProducerRecord<>(TOPIC, 0, ++lastProducedValue); + private void produce(final Producer producer) { + final ProducerRecord record = new ProducerRecord<>(TOPIC, 0, String.valueOf(++lastProducedValue)); try { producer.send(record, (metadata, exception) -> { if (exception != null) { @@ -102,36 +100,22 @@ private void produce(final Producer producer) { producer.flush(); } - private int consume(final Consumer consumer) { + private int consume(final Consumer consumer) { int lastValue = -1; - final ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); - for (final ConsumerRecord record : records) { - lastValue = record.value(); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + for (final ConsumerRecord record : records) { + lastValue = Integer.valueOf(record.value()); consumer.commitAsync(); } return lastValue; } - public Producer getProducer() { - final Map map = getCommonConfig(); - map.put(ProducerConfig.ACKS_CONFIG, "all"); - map.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "3000"); - map.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000"); - map.put(ProducerConfig.LINGER_MS_CONFIG, "0"); - map.put(ProducerConfig.RETRIES_CONFIG, "0"); - map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - return new KafkaProducer<>(map); + public Producer getProducer() { + return new KafkaProducer<>(K3aTestUtils.producerProps(broker)); } - public Consumer getConsumer(final String consumerGroupId) { - final Map map = getCommonConfig(); - map.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); - map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - return new KafkaConsumer<>(map); + public Consumer getConsumer(final String consumerGroupId) { + return new KafkaConsumer<>(K3aTestUtils.consumerProps(consumerGroupId, false, broker)); } public Map getMinimalAdminConfig() { From 590828b1b230ada314eb792bde83b2f08c4b2b6b Mon Sep 17 00:00:00 2001 From: "Sverre H. Huseby" Date: Tue, 24 Oct 2023 15:05:37 +0200 Subject: [PATCH 2/2] refactor: inline methods --- .../k3alagexporter/itest/K3aLagExporterIT.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java b/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java index 410be71..e1f7862 100644 --- a/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java +++ b/src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java @@ -55,8 +55,8 @@ public void shouldDetectLag() { lagCollector = new ClusterLagCollector(CLUSTER_NAME, null, null, null, null, getMinimalConsumerConfig(), getMinimalAdminConfig()); - try (final Producer producer = getProducer()) { - try (final Consumer consumer = getConsumer(CONSUMER_GROUP_ID)) { + try (final Producer producer = new KafkaProducer<>(K3aTestUtils.producerProps(broker))) { + try (final Consumer consumer = new KafkaConsumer<>(K3aTestUtils.consumerProps(CONSUMER_GROUP_ID, false, broker))) { consumer.subscribe(Collections.singleton(TOPIC)); produce(producer); int consumedValue = consume(consumer); @@ -110,14 +110,6 @@ private int consume(final Consumer consumer) { return lastValue; } - public Producer getProducer() { - return new KafkaProducer<>(K3aTestUtils.producerProps(broker)); - } - - public Consumer getConsumer(final String consumerGroupId) { - return new KafkaConsumer<>(K3aTestUtils.consumerProps(consumerGroupId, false, broker)); - } - public Map getMinimalAdminConfig() { return getCommonConfig(); }