diff --git a/src/main/java/org/goafabric/eventdispatcher/consumer/CalendarConsumer.java b/src/main/java/org/goafabric/eventdispatcher/consumer/CalendarConsumer.java index 4e4e217..4148441 100644 --- a/src/main/java/org/goafabric/eventdispatcher/consumer/CalendarConsumer.java +++ b/src/main/java/org/goafabric/eventdispatcher/consumer/CalendarConsumer.java @@ -18,12 +18,9 @@ public class CalendarConsumer implements LatchConsumer { private final Logger log = LoggerFactory.getLogger(this.getClass()); - static final String CONSUMER_NAME = "Calendar"; - public static Long CONSUMER_COUNT = 0L; - + private static final String CONSUMER_NAME = "Calendar"; private final CountDownLatch latch = new CountDownLatch(1); - @KafkaListener(groupId = CONSUMER_NAME, topics = {"patient", "practitioner"}) //only topics listed here will be autocreated public void processKafka(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, EventData eventData) { withTenantInfos(() -> process(topic, eventData)); @@ -44,7 +41,6 @@ private void process(String topic, EventData eventData) { } } } - CONSUMER_COUNT++; latch.countDown(); } diff --git a/src/main/java/org/goafabric/eventdispatcher/consumer/InvoiceConsumer.java b/src/main/java/org/goafabric/eventdispatcher/consumer/InvoiceConsumer.java index e67f141..f64f05e 100644 --- a/src/main/java/org/goafabric/eventdispatcher/consumer/InvoiceConsumer.java +++ b/src/main/java/org/goafabric/eventdispatcher/consumer/InvoiceConsumer.java @@ -18,7 +18,7 @@ public class InvoiceConsumer implements LatchConsumer { private final Logger log = LoggerFactory.getLogger(this.getClass()); - static final String CONSUMER_NAME = "Invoice"; + private static final String CONSUMER_NAME = "Invoice"; private final CountDownLatch latch = new CountDownLatch(1); @KafkaListener(groupId = CONSUMER_NAME, topicPattern = ".*", topics = {"condition", "chargeitem"}) diff --git a/src/main/java/org/goafabric/eventdispatcher/consumer/LoggerConsumer.java b/src/main/java/org/goafabric/eventdispatcher/consumer/LoggerConsumer.java index f4652af..591555c 100644 --- a/src/main/java/org/goafabric/eventdispatcher/consumer/LoggerConsumer.java +++ b/src/main/java/org/goafabric/eventdispatcher/consumer/LoggerConsumer.java @@ -18,7 +18,7 @@ public class LoggerConsumer implements LatchConsumer { private final Logger log = LoggerFactory.getLogger(this.getClass()); - static final String CONSUMER_NAME = "Logger"; + private static final String CONSUMER_NAME = "Logger"; private final CountDownLatch latch = new CountDownLatch(1); @KafkaListener(groupId = CONSUMER_NAME, topicPattern = ".*") diff --git a/src/test/java/org/goafabric/eventdispatcher/service/producer/EventProducerLoadNRIT.java b/src/test/java/org/goafabric/eventdispatcher/service/producer/EventProducerLoadNRIT.java index 58083b0..76f2ef7 100644 --- a/src/test/java/org/goafabric/eventdispatcher/service/producer/EventProducerLoadNRIT.java +++ b/src/test/java/org/goafabric/eventdispatcher/service/producer/EventProducerLoadNRIT.java @@ -1,6 +1,5 @@ package org.goafabric.eventdispatcher.service.producer; -import org.goafabric.eventdispatcher.consumer.CalendarConsumer; import org.goafabric.eventdispatcher.producer.EventProducer; import org.goafabric.eventdispatcher.service.controller.dto.ChangeEvent; import org.goafabric.eventdispatcher.service.controller.dto.DbOperation; @@ -35,7 +34,7 @@ public void testProduce() { eventProducer.produce(createEvent(createPatient(), DbOperation.CREATE)); eventProducer.produce(createEvent(createPatient(), DbOperation.CREATE)); } - long count = CalendarConsumer.CONSUMER_COUNT; + long count = TestConsumer.CONSUMER_COUNT; log.info("iteration for {} s, events processed {}, events/s {}", duration, count, count / duration); } @@ -50,7 +49,7 @@ private ChangeEvent createEvent(Object object, DbOperation operation) { return new ChangeEvent( UUID.randomUUID().toString(), UUID.randomUUID().toString(), - "patient", + "test-topic", operation, "secret-service", object diff --git a/src/test/java/org/goafabric/eventdispatcher/service/producer/TestConsumer.java b/src/test/java/org/goafabric/eventdispatcher/service/producer/TestConsumer.java new file mode 100644 index 0000000..41784ec --- /dev/null +++ b/src/test/java/org/goafabric/eventdispatcher/service/producer/TestConsumer.java @@ -0,0 +1,47 @@ +package org.goafabric.eventdispatcher.service.producer; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import org.goafabric.event.EventData; +import org.goafabric.eventdispatcher.consumer.LatchConsumer; +import org.goafabric.eventdispatcher.service.extensions.TenantContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CountDownLatch; + +@Component +public class TestConsumer implements LatchConsumer { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + static final String CONSUMER_NAME = "Calendar"; + public static Long CONSUMER_COUNT = 0L; + + private final CountDownLatch latch = new CountDownLatch(1); + + + @KafkaListener(groupId = CONSUMER_NAME, topics = {"test-topic"}) //only topics listed here will be autocreated + public void processKafka(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, EventData eventData) { + withTenantInfos(() -> process(topic, eventData)); + } + + private void process(String topic, EventData eventData) { + log.info("processing test event"); + CONSUMER_COUNT++; + latch.countDown(); + } + + private static void withTenantInfos(Runnable runnable) { + Span.fromContext(Context.current()).setAttribute("tenant.id", TenantContext.getTenantId()); + MDC.put("tenantId", TenantContext.getTenantId()); + try { runnable.run(); } finally { MDC.remove("tenantId"); } + } + + @Override + public CountDownLatch getLatch() { return latch; } +}