Skip to content

Commit

Permalink
chore(deps): update k3a-embedded to 0.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sverrehu committed Oct 24, 2023
1 parent 4ecb8e7 commit 9f29c9d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<dependency>
<groupId>no.shhsoft</groupId>
<artifactId>k3a-embedded</artifactId>
<version>0.3.2+${kafka.version}</version>
<version>0.4.0+${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.statnett.k3alagexporter.itest;

import no.shhsoft.k3aembedded.K3aEmbedded;
import no.shhsoft.k3aembedded.K3aTestUtils;
import no.statnett.k3alagexporter.ClusterLagCollector;
import no.statnett.k3alagexporter.model.ClusterData;
import no.statnett.k3alagexporter.model.ConsumerGroupData;
Expand All @@ -13,12 +14,9 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -57,8 +55,8 @@ public void shouldDetectLag() {
lagCollector = new ClusterLagCollector(CLUSTER_NAME,
null, null, null, null,
getMinimalConsumerConfig(), getMinimalAdminConfig());
try (final Producer<Integer, Integer> producer = getProducer()) {
try (final Consumer<Integer, Integer> consumer = getConsumer(CONSUMER_GROUP_ID)) {
try (final Producer<Integer, String> producer = getProducer()) {
try (final Consumer<Integer, String> consumer = getConsumer(CONSUMER_GROUP_ID)) {
consumer.subscribe(Collections.singleton(TOPIC));
produce(producer);
int consumedValue = consume(consumer);
Expand Down Expand Up @@ -87,8 +85,8 @@ private void assertLag(final int expected) {
assertEquals(expected, consumerGroupData.getLag(), 0.00001);
}

private void produce(final Producer<Integer, Integer> producer) {
final ProducerRecord<Integer, Integer> record = new ProducerRecord<>(TOPIC, 0, ++lastProducedValue);
private void produce(final Producer<Integer, String> producer) {
final ProducerRecord<Integer, String> record = new ProducerRecord<>(TOPIC, 0, String.valueOf(++lastProducedValue));
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
Expand All @@ -102,36 +100,22 @@ private void produce(final Producer<Integer, Integer> producer) {
producer.flush();
}

private int consume(final Consumer<Integer, Integer> consumer) {
private int consume(final Consumer<Integer, String> consumer) {
int lastValue = -1;
final ConsumerRecords<Integer, Integer> records = consumer.poll(Duration.ofMillis(1000));
for (final ConsumerRecord<Integer, Integer> record : records) {
lastValue = record.value();
final ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));
for (final ConsumerRecord<Integer, String> record : records) {
lastValue = Integer.valueOf(record.value());
consumer.commitAsync();
}
return lastValue;
}

public Producer<Integer, Integer> getProducer() {
final Map<String, Object> map = getCommonConfig();
map.put(ProducerConfig.ACKS_CONFIG, "all");
map.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "3000");
map.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000");
map.put(ProducerConfig.LINGER_MS_CONFIG, "0");
map.put(ProducerConfig.RETRIES_CONFIG, "0");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
return new KafkaProducer<>(map);
public Producer<Integer, String> getProducer() {
return new KafkaProducer<>(K3aTestUtils.producerProps(broker));
}

public Consumer<Integer, Integer> getConsumer(final String consumerGroupId) {
final Map<String, Object> map = getCommonConfig();
map.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
return new KafkaConsumer<>(map);
public Consumer<Integer, String> getConsumer(final String consumerGroupId) {
return new KafkaConsumer<>(K3aTestUtils.consumerProps(consumerGroupId, false, broker));
}

public Map<String, Object> getMinimalAdminConfig() {
Expand Down

0 comments on commit 9f29c9d

Please sign in to comment.