Skip to content

Commit

Permalink
embedded kafka bean (#251)
Browse files Browse the repository at this point in the history
embedded kayak for testing

Co-authored-by: Hans Petter Simonsen <[email protected]>
  • Loading branch information
hakonph and holymaloney authored Sep 6, 2021
1 parent 96b8976 commit 18cf28f
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ public class OpprettForesporselOmDelingAvCv {
private final StillingFraNavProducerClient producerClient;
private final Nivaa4Client nivaa4Client;


@KafkaListener(topics = "${topic.inn.stillingFraNav}")
@Transactional
@KafkaListener(topics = "${topic.inn.stillingFraNav}")
public void createAktivitet(ForesporselOmDelingAvCv melding) {
if (delingAvCvService.aktivitetAlleredeOpprettetForBestillingsId(melding.getBestillingsId())) {
log.info("ForesporselOmDelingAvCv med bestillingsId={} har allerede en aktivitet", melding.getBestillingsId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,35 @@
import no.nav.common.abac.Pep;
import no.nav.common.auth.context.AuthContextHolder;
import no.nav.common.auth.context.AuthContextHolderThreadLocal;
import no.nav.common.client.aktoroppslag.AktorOppslagClient;
import no.nav.common.job.leader_election.LeaderElectionClient;
import no.nav.common.kafka.producer.KafkaProducerClient;
import no.nav.common.metrics.MetricsClient;
import no.nav.common.sts.SystemUserTokenProvider;
import no.nav.common.utils.Credentials;
import no.nav.veilarbaktivitet.kvp.KvpClient;
import no.nav.veilarbaktivitet.mock.AktorOppslackMock;
import no.nav.veilarbaktivitet.mock.LocalH2Database;
import no.nav.veilarbaktivitet.mock.MetricsClientMock;
import no.nav.veilarbaktivitet.mock.PepMock;
import no.nav.veilarbaktivitet.nivaa4.Nivaa4Client;
import okhttp3.OkHttpClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.transaction.ChainedTransactionManager;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;

import javax.sql.DataSource;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -117,5 +120,24 @@ public Pep veilarbPep() {
return new PepMock(null);
}

@Bean
public EmbeddedKafkaBroker embeddedKafka(@Value("${topic.inn.stillingFraNav}") String innTopic, @Value("${topic.ut.stillingFraNav}") String utTopic) {
return new EmbeddedKafkaBroker(1, true, 1, innTopic, utTopic);

}

@Bean
ProducerFactory<Object, Object> producerFactory(KafkaProperties kafkaProperties, EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProperties);
}

@Bean
public ConsumerFactory<?, ?> consumerFactory(KafkaProperties kafkaProperties, EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
@Slf4j
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${topic.inn.stillingFraNav}", "${topic.ut.stillingFraNav}"}, partitions = 1)
@AutoConfigureWireMock(port = 0)
@Transactional
public class AktivitetsplanITest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
*/
@SpringBootTest
@RunWith(SpringRunner.class)
@EmbeddedKafka
@AutoConfigureWireMock(port = 0)
public class AktivitetsplanRSTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static org.junit.Assert.assertEquals;


@EmbeddedKafka(topics = "test", partitions = 1)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWireMock(port = 0)
@RunWith(SpringRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -38,7 +38,6 @@
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${topic.inn.stillingFraNav}","${topic.ut.stillingFraNav}"}, partitions = 1)
@AutoConfigureWireMock(port = 0)
@Transactional
public class AktivitetDAOTest {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package no.nav.veilarbaktivitet.stilling_fra_nav;

import com.github.tomakehurst.wiremock.client.WireMock;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import lombok.extern.slf4j.Slf4j;
import no.nav.veilarbaktivitet.avro.DelingAvCvRespons;
import no.nav.veilarbaktivitet.avro.TilstandEnum;
Expand All @@ -12,14 +10,11 @@
import no.nav.veilarbaktivitet.domain.AktivitetsplanDTO;
import no.nav.veilarbaktivitet.stilling_fra_nav.deling_av_cv.Arbeidssted;
import no.nav.veilarbaktivitet.stilling_fra_nav.deling_av_cv.ForesporselOmDelingAvCv;
import no.nav.veilarbaktivitet.util.ITestService;
import no.nav.veilarbaktivitet.util.MockBruker;
import no.nav.veilarbaktivitet.util.TestService;
import no.nav.veilarbaktivitet.util.WireMockUtil;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.SoftAssertions;
import org.junit.After;
import org.junit.Before;
Expand All @@ -31,34 +26,25 @@
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.junit.Assert.*;
import static org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${topic.inn.stillingFraNav}", "${topic.ut.stillingFraNav}"}, partitions = 1)
@AutoConfigureWireMock(port = 0)
@Slf4j
public class DelingAvCvITest {

@Autowired
TestService testService;

@Autowired
EmbeddedKafkaBroker embeddedKafka;
ITestService testService;

@Autowired
JdbcTemplate jdbc;
Expand All @@ -72,29 +58,30 @@ public class DelingAvCvITest {
@Value("${topic.ut.stillingFraNav}")
private String utTopic;

@Value("${spring.kafka.properties.schema.registry.url}")
private String schemaRegistryUrl;

/***** Ekte bønner *****/

@Autowired
KafkaTemplate<String, ForesporselOmDelingAvCv> producer;

Consumer<String, DelingAvCvRespons> consumer;

@After
public void verify_no_unmatched() {
assertTrue(WireMock.findUnmatchedRequests().isEmpty());

consumer.unsubscribe();
consumer.close();
}

@Before
public void cleanupBetweenTests() {
DbTestUtils.cleanupTestDb(jdbc);

consumer = testService.createConsumer(utTopic);
}

@Test
public void happy_case() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
WireMockUtil.stubBruker(mockBruker);

Expand Down Expand Up @@ -130,8 +117,6 @@ public void happy_case() {

@Test
public void ikke_under_oppfolging() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
mockBruker.setUnderOppfolging(false);
WireMockUtil.stubBruker(mockBruker);
Expand All @@ -156,8 +141,6 @@ public void ikke_under_oppfolging() {

@Test
public void under_oppfolging_kvp() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
mockBruker.setUnderOppfolging(true);
mockBruker.setErUnderKvp(true);
Expand All @@ -184,8 +167,6 @@ public void under_oppfolging_kvp() {

@Test
public void under_manuell_oppfolging() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
mockBruker.setErManuell(true);
WireMockUtil.stubBruker(mockBruker);
Expand All @@ -211,8 +192,6 @@ public void under_manuell_oppfolging() {

@Test
public void reservert_i_krr() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
mockBruker.setErReservertKrr(true);
WireMockUtil.stubBruker(mockBruker);
Expand All @@ -239,8 +218,6 @@ public void reservert_i_krr() {

@Test
public void mangler_nivaa4() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
mockBruker.setHarBruktNivaa4(false);
WireMockUtil.stubBruker(mockBruker);
Expand All @@ -265,8 +242,6 @@ public void mangler_nivaa4() {

@Test
public void duplikat_bestillingsId_ignoreres() {
final Consumer<String, DelingAvCvRespons> consumer = createConsumer();

MockBruker mockBruker = MockBruker.happyBruker("1234", "4321");
WireMockUtil.stubBruker(mockBruker);

Expand All @@ -277,7 +252,6 @@ public void duplikat_bestillingsId_ignoreres() {

final ConsumerRecord<String, DelingAvCvRespons> record = getSingleRecord(consumer, utTopic, 5000);
DelingAvCvRespons value = record.value();

SoftAssertions.assertSoftly(assertions -> {
assertions.assertThat(value.getBestillingsId()).isEqualTo(bestillingsId);
assertions.assertThat(value.getAktorId()).isEqualTo(mockBruker.getAktorId());
Expand Down Expand Up @@ -323,34 +297,4 @@ static ForesporselOmDelingAvCv createMelding(String bestillingsId, String aktorI
.build();
}

private Consumer<String, DelingAvCvRespons> createConsumer() {
Consumer<String, DelingAvCvRespons> consumer = buildConsumer(
StringDeserializer.class,
KafkaAvroDeserializer.class
);
embeddedKafka.consumeFromEmbeddedTopics(consumer, utTopic);
consumer.commitSync(); // commitSync venter på async funksjonen av å lage consumeren, så man vet consumeren er satt opp
return consumer;
}

@SuppressWarnings("rawtypes")
private <K, V> Consumer<K, V> buildConsumer(Class<? extends Deserializer> keyDeserializer,
Class<? extends Deserializer> valueDeserializer) {
// Use the procedure documented at https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#embedded-kafka-annotation

final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafka);
// Since we're pre-sending the messages to test for, we need to read from start of topic
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// We need to match the ser/deser used in expected application config
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
consumerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
consumerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

final DefaultKafkaConsumerFactory<K, V> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
return consumerFactory.createConsumer();
}

}
Loading

0 comments on commit 18cf28f

Please sign in to comment.