Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Mautsch committed Aug 4, 2024
1 parent 411d11b commit f6ca34f
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
public class CalendarConsumer implements LatchConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());

static final String CONSUMER_NAME = "Calendar";
public static Long CONSUMER_COUNT = 0L;

private static final String CONSUMER_NAME = "Calendar";
private final CountDownLatch latch = new CountDownLatch(1);


@KafkaListener(groupId = CONSUMER_NAME, topics = {"patient", "practitioner"}) //only topics listed here will be autocreated
public void processKafka(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, EventData eventData) {
withTenantInfos(() -> process(topic, eventData));
Expand All @@ -44,7 +41,6 @@ private void process(String topic, EventData eventData) {
}
}
}
CONSUMER_COUNT++;
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class InvoiceConsumer implements LatchConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());

static final String CONSUMER_NAME = "Invoice";
private static final String CONSUMER_NAME = "Invoice";
private final CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(groupId = CONSUMER_NAME, topicPattern = ".*", topics = {"condition", "chargeitem"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class LoggerConsumer implements LatchConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());

static final String CONSUMER_NAME = "Logger";
private static final String CONSUMER_NAME = "Logger";
private final CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(groupId = CONSUMER_NAME, topicPattern = ".*")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.goafabric.eventdispatcher.service.producer;

import org.goafabric.eventdispatcher.consumer.CalendarConsumer;
import org.goafabric.eventdispatcher.producer.EventProducer;
import org.goafabric.eventdispatcher.service.controller.dto.ChangeEvent;
import org.goafabric.eventdispatcher.service.controller.dto.DbOperation;
Expand Down Expand Up @@ -35,7 +34,7 @@ public void testProduce() {
eventProducer.produce(createEvent(createPatient(), DbOperation.CREATE));
eventProducer.produce(createEvent(createPatient(), DbOperation.CREATE));
}
long count = CalendarConsumer.CONSUMER_COUNT;
long count = TestConsumer.CONSUMER_COUNT;
log.info("iteration for {} s, events processed {}, events/s {}", duration, count, count / duration);
}

Expand All @@ -50,7 +49,7 @@ private ChangeEvent createEvent(Object object, DbOperation operation) {
return new ChangeEvent(
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
"patient",
"test-topic",
operation,
"secret-service",
object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.goafabric.eventdispatcher.service.producer;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import org.goafabric.event.EventData;
import org.goafabric.eventdispatcher.consumer.LatchConsumer;
import org.goafabric.eventdispatcher.service.extensions.TenantContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

@Component
public class TestConsumer implements LatchConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());

static final String CONSUMER_NAME = "Calendar";
public static Long CONSUMER_COUNT = 0L;

private final CountDownLatch latch = new CountDownLatch(1);


@KafkaListener(groupId = CONSUMER_NAME, topics = {"test-topic"}) //only topics listed here will be autocreated
public void processKafka(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, EventData eventData) {
withTenantInfos(() -> process(topic, eventData));
}

private void process(String topic, EventData eventData) {
log.info("processing test event");
CONSUMER_COUNT++;
latch.countDown();
}

private static void withTenantInfos(Runnable runnable) {
Span.fromContext(Context.current()).setAttribute("tenant.id", TenantContext.getTenantId());
MDC.put("tenantId", TenantContext.getTenantId());
try { runnable.run(); } finally { MDC.remove("tenantId"); }
}

@Override
public CountDownLatch getLatch() { return latch; }
}

0 comments on commit f6ca34f

Please sign in to comment.