diff --git a/integrasjon/kafka-properties/README.MD b/integrasjon/kafka-properties/README.MD
index 44acadd02..bd3ecf0fb 100644
--- a/integrasjon/kafka-properties/README.MD
+++ b/integrasjon/kafka-properties/README.MD
@@ -1,4 +1,12 @@
# Kafka - Aiven - oppsett
-Denne modulen inneholder en støttemetode for oppsett av Kafka Producers og Streams + standard String-serdes for topics som bruker JSON.
+Denne modulen inneholder støttemetoder for oppsett av Kafka Producers og Consumer + String-standard for topics som bruker JSON.
+
+Producer:
+* Enten opprett en KafkaSender for hver topic og bruk send med key + message
+* Eller opprett global KafkaSender med topic = null og bruk send med key + message + topic
+
+Consumers
+* Håndterere (ofte Transactional) implementerer KafkaMessageHandler / KafkaStringMessageHandler
+* Applikasjonene definerer en Controllable som lager en KafkaConsumerManager av handlere og starter/stopper den
Enkelte applikasjoner konsumerer AVRO-topics - de må selv legge til avhengighet til confluent.io og sette opp Schema Registry og deserialisering av Avro
diff --git a/integrasjon/kafka-properties/pom.xml b/integrasjon/kafka-properties/pom.xml
index 4362e01a1..279c7f331 100644
--- a/integrasjon/kafka-properties/pom.xml
+++ b/integrasjon/kafka-properties/pom.xml
@@ -29,11 +29,6 @@
kafka-clients
provided
-
- org.apache.kafka
- kafka-streams
- provided
-
diff --git a/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaConsumerManager.java b/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaConsumerManager.java
index ef8335c55..c8fb9cb35 100644
--- a/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaConsumerManager.java
+++ b/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaConsumerManager.java
@@ -2,28 +2,29 @@
import java.time.Duration;
import java.time.LocalDateTime;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
public class KafkaConsumerManager {
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
- private final List> handlers;
- private final List> consumers = new ArrayList<>();
+ private final List> consumers;
- public KafkaConsumerManager(List> handlers) {
- this.handlers = handlers;
+
+ public KafkaConsumerManager(KafkaMessageHandler handler) {
+ this(List.of(handler));
+ }
+
+ public KafkaConsumerManager(List extends KafkaMessageHandler> handlers) {
+ this.consumers = handlers.stream().map(KafkaConsumerLoop::new).toList();
}
public void start(BiConsumer errorlogger) {
- consumers.addAll(handlers.stream().map(KafkaConsumerLoop::new).toList());
consumers.forEach(c -> {
var ct = new Thread(c, "KC-" + c.handler().groupId());
ct.setUncaughtExceptionHandler((t, e) -> { errorlogger.accept(c.handler().topic(), e); stop(); });
@@ -53,7 +54,7 @@ public boolean allStopped() {
}
public String topicNames() {
- return handlers.stream().map(KafkaMessageHandler::topic).collect(Collectors.joining(","));
+ return consumers.stream().map(KafkaConsumerLoop::handler).map(KafkaMessageHandler::topic).collect(Collectors.joining(","));
}
private record KafkaConsumerCloser(List> consumers) implements Runnable {
@@ -66,7 +67,7 @@ public void run() {
public static class KafkaConsumerLoop implements Runnable {
private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
- private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
+
private enum ConsumerState { UNINITIALIZED, RUNNING, STOPPING, STOPPED }
private static final int RUNNING = ConsumerState.RUNNING.hashCode();
@@ -77,23 +78,25 @@ private enum ConsumerState { UNINITIALIZED, RUNNING, STOPPING, STOPPED }
public KafkaConsumerLoop(KafkaMessageHandler handler) {
this.handler = handler;
}
+
+ // Implementert som at-least-once - krever passe idempotente handleRecord og regner med at de er Transactional (commit hvert kall)
+ // Hvis man vil komplisere ting så kan gå for exactly-once - håndtere OffsetCommit (set property ENABLE_AUTO_COMMIT_CONFIG false)
+ // Man må da være bevisst på samspill DB-commit og Offset-commit - lage en Transactional handleRecords for alle som er pollet.
+ // handleRecords må ta inn ConsumerRecords (alle pollet) og 2 callbacks som a) legger til konsumert og b) kaller commitAsync(konsumert)
+ // Dessuten må man catche WakeupException og andre exceptions og avstemme håndtering (OffsetCommit) med DB-TX-Commit
@Override
public void run() {
try(var key = handler.keyDeserializer().get(); var value = handler.valueDeserializer().get()) {
- var props = KafkaProperties.forConsumerGenericValue(handler.groupId(), key, value, handler.autoOffsetReset());
+ var props = KafkaProperties.forConsumerGenericValue(handler.groupId(), key, value, handler.autoOffsetReset().orElse(null));
consumer = new KafkaConsumer<>(props, key, value);
consumer.subscribe(List.of(handler.topic()));
running.set(RUNNING);
while (running.get() == RUNNING) {
var records = consumer.poll(POLL_TIMEOUT);
- // Hvis man vil komplisere ting så kan man håndtere både OffsetCommit og DBcommit i en Transcational handleRecords.
- // handleRecords må ta inn alle som er pollet (records) og 2 callbacks som a) legger til konsumert og b) commitAsync(konsumert)
for (var record : records) {
handler.handleRecord(record.key(), record.value());
}
}
- } catch (WakeupException e) {
- // ignore for shutdown
} finally {
if (consumer != null) {
consumer.close(CLOSE_TIMEOUT);
@@ -108,9 +111,7 @@ public void shutdown() {
} else {
running.set(ConsumerState.STOPPED.hashCode());
}
- if (consumer != null) {
- consumer.wakeup();
- }
+ // Kan vurdere consumer.wakeup() + håndtere WakeupException ovenfor - men har utelatt til fordel for en tilstand og polling med kort timeout
}
public KafkaMessageHandler handler() {
diff --git a/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaProperties.java b/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaProperties.java
index 96449fe18..c3d0ef839 100644
--- a/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaProperties.java
+++ b/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaProperties.java
@@ -1,6 +1,5 @@
package no.nav.vedtak.felles.integrasjon.kafka;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
@@ -13,17 +12,7 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
-import org.apache.kafka.streams.state.RocksDBConfigSetter;
-import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
-import org.rocksdb.BloomFilter;
-import org.rocksdb.LRUCache;
-import org.rocksdb.Options;
import no.nav.foreldrepenger.konfig.Environment;
@@ -55,23 +44,18 @@ public static Properties forProducer() {
return props;
}
- // Alle som konsumerer Json-meldinger
- public static Properties forConsumerStringValue(String groupId) {
- return forConsumerGenericValue(groupId, new StringDeserializer(), new StringDeserializer(), Optional.empty());
- }
-
- public static Properties forConsumerGenericValue(String groupId, Deserializer valueKey, Deserializer valueSerde, Optional offsetReset) {
+ public static Properties forConsumerGenericValue(String groupId, Deserializer keyDeserializer, Deserializer valueDeserializer, OffsetResetStrategy offsetReset) {
final Properties props = new Properties();
props.put(CommonClientConfigs.GROUP_ID_CONFIG, groupId);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, generateClientId());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getAivenConfig(AivenProperty.KAFKA_BROKERS));
- offsetReset.ifPresent(or -> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, or.toString()));
+ Optional.ofNullable(offsetReset).ifPresent(or -> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, or.toString()));
putSecurity(props);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, valueKey.getClass());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde.getClass());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
// Polling
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // Unngå store Tx dersom alle prosesseres innen samme Tx. Default 500
@@ -80,33 +64,13 @@ public static Properties forConsumerGenericValue(String groupId, Deseriali
return props;
}
- // Alle som konsumerer Json-meldinger
- public static Properties forStreamsStringValue(String applicationId) {
- return forStreamsGenericValue(applicationId, Serdes.String());
- }
-
- public static Properties forStreamsGenericValue(String applicationId, Serde valueSerde) {
- final Properties props = new Properties();
-
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- props.put(StreamsConfig.CLIENT_ID_CONFIG, generateClientId());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getAivenConfig(AivenProperty.KAFKA_BROKERS));
-
- putSecurity(props);
-
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde.getClass());
- props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
-
- props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY);
- props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, StreamsRocksReadOnly.class);
-
- // Polling
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");
+ /*
+ * Streams-config er fjernet. Ved evt re-innføring husk at det trends read+write til topic for å unngå log-spamming.
+ * - APPLICATION_ID_CONFIG = tisvarende verdi som brukes for GROUP_ID_CONFIG (men kan ikke ha både streams og consumer)
+ * - KEY+VALUE SERDE - typisk Serdes.String() + derserialization_exception = LogAndFailExceptionHandler
+ * - Bør se på rocksdb-setting (se i historikk)
+ */
- return props;
- }
// Trengs kun for de som skal konsumere Avro. Ellers ikke
public static String getAvroSchemaRegistryURL() {
@@ -148,26 +112,4 @@ private static void putSecurity(Properties props) {
}
}
- public static class StreamsRocksReadOnly implements RocksDBConfigSetter {
-
- @Override
- public void setConfig(final String storeName, final Options options, final Map configs) {
-
- BlockBasedTableConfigWithAccessibleCache tableConfig = new BlockBasedTableConfigWithAccessibleCache();
- tableConfig.setBlockCache(new LRUCache(1024 * 1024L));
- tableConfig.setBlockSize(4096L);
- tableConfig.setFilterPolicy(new BloomFilter());
- tableConfig.setCacheIndexAndFilterBlocks(true);
- options.setTableFormatConfig(tableConfig);
-
- options.setWriteBufferSize(512 * 1024L);
- options.setMaxWriteBufferNumber(2);
- }
-
- @Override
- public void close(final String storeName, final Options options) {
- // NOOP
- }
- }
-
}
diff --git a/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaSender.java b/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaSender.java
index 2ad436f15..696d91ecc 100644
--- a/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaSender.java
+++ b/integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaSender.java
@@ -1,5 +1,6 @@
package no.nav.vedtak.felles.integrasjon.kafka;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -11,12 +12,19 @@ public class KafkaSender {
private final Producer producer;
private final String topic;
- public KafkaSender(Producer producer, String topic) {
- this.producer = producer;
+ public KafkaSender(String topic) {
+ this.producer = new KafkaProducer<>(KafkaProperties.forProducer());
this.topic = topic;
}
public RecordMetadata send(String key, String message) {
+ if (topic == null) {
+ throw kafkaPubliseringException("null", new IllegalArgumentException());
+ }
+ return send(key, message, this.topic);
+ }
+
+ public RecordMetadata send(String key, String message, String topic) {
try {
var record = new ProducerRecord<>(topic, key, message);
return producer.send(record).get();
@@ -24,11 +32,11 @@ var record = new ProducerRecord<>(topic, key, message);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- throw kafkaPubliseringException(e);
+ throw kafkaPubliseringException(topic, e);
}
}
- private IntegrasjonException kafkaPubliseringException(Exception e) {
+ private IntegrasjonException kafkaPubliseringException(String topic, Exception e) {
return new IntegrasjonException("F-KAFKA-925475", "Unexpected error when sending message to topic " + topic, e);
}