Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): update k3a-embedded to 0.4.0 #101

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<dependency>
<groupId>no.shhsoft</groupId>
<artifactId>k3a-embedded</artifactId>
<version>0.3.2+${kafka.version}</version>
<version>0.4.0+${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.statnett.k3alagexporter.itest;

import no.shhsoft.k3aembedded.K3aEmbedded;
import no.shhsoft.k3aembedded.K3aTestUtils;
import no.statnett.k3alagexporter.ClusterLagCollector;
import no.statnett.k3alagexporter.model.ClusterData;
import no.statnett.k3alagexporter.model.ConsumerGroupData;
Expand All @@ -13,12 +14,9 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -57,8 +55,8 @@ public void shouldDetectLag() {
lagCollector = new ClusterLagCollector(CLUSTER_NAME,
null, null, null, null,
getMinimalConsumerConfig(), getMinimalAdminConfig());
try (final Producer<Integer, Integer> producer = getProducer()) {
try (final Consumer<Integer, Integer> consumer = getConsumer(CONSUMER_GROUP_ID)) {
try (final Producer<Integer, String> producer = new KafkaProducer<>(K3aTestUtils.producerProps(broker))) {
try (final Consumer<Integer, String> consumer = new KafkaConsumer<>(K3aTestUtils.consumerProps(CONSUMER_GROUP_ID, false, broker))) {
consumer.subscribe(Collections.singleton(TOPIC));
produce(producer);
int consumedValue = consume(consumer);
Expand Down Expand Up @@ -87,8 +85,8 @@ private void assertLag(final int expected) {
assertEquals(expected, consumerGroupData.getLag(), 0.00001);
}

private void produce(final Producer<Integer, Integer> producer) {
final ProducerRecord<Integer, Integer> record = new ProducerRecord<>(TOPIC, 0, ++lastProducedValue);
private void produce(final Producer<Integer, String> producer) {
final ProducerRecord<Integer, String> record = new ProducerRecord<>(TOPIC, 0, String.valueOf(++lastProducedValue));
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
Expand All @@ -102,38 +100,16 @@ private void produce(final Producer<Integer, Integer> producer) {
producer.flush();
}

private int consume(final Consumer<Integer, Integer> consumer) {
private int consume(final Consumer<Integer, String> consumer) {
int lastValue = -1;
final ConsumerRecords<Integer, Integer> records = consumer.poll(Duration.ofMillis(1000));
for (final ConsumerRecord<Integer, Integer> record : records) {
lastValue = record.value();
final ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));
for (final ConsumerRecord<Integer, String> record : records) {
lastValue = Integer.valueOf(record.value());
consumer.commitAsync();
}
return lastValue;
}

public Producer<Integer, Integer> getProducer() {
final Map<String, Object> map = getCommonConfig();
map.put(ProducerConfig.ACKS_CONFIG, "all");
map.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "3000");
map.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000");
map.put(ProducerConfig.LINGER_MS_CONFIG, "0");
map.put(ProducerConfig.RETRIES_CONFIG, "0");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
return new KafkaProducer<>(map);
}

public Consumer<Integer, Integer> getConsumer(final String consumerGroupId) {
final Map<String, Object> map = getCommonConfig();
map.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
return new KafkaConsumer<>(map);
}

public Map<String, Object> getMinimalAdminConfig() {
return getCommonConfig();
}
Expand Down