Skip to content

Commit

Permalink
bean
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Mautsch committed Jul 5, 2024
1 parent d8e99f6 commit 96a7de5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@ public class NatsCalendarConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final String CONSUMER_NAME = "Calendar";


public NatsCalendarConsumer(NatsSubscription natsSubscription) {
natsSubscription.create(CONSUMER_NAME, "patient.*",
(msg, eventData) -> process(msg.getSubject(), eventData));

natsSubscription.create(CONSUMER_NAME, "practitioner.*",
(msg, eventData) -> process(msg.getSubject(), eventData));
natsSubscription.create(CONSUMER_NAME, "patient.*", (msg, eventData) -> process(msg.getSubject(), eventData));
natsSubscription.create(CONSUMER_NAME, "practitioner.*", (msg, eventData) -> process(msg.getSubject(), eventData));
}

private void process(String key, EventData eventData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ public class NatsLoggerConsumer {
private static final String CONSUMER_NAME = "Logger";

public NatsLoggerConsumer(NatsSubscription natsSubscription) {
natsSubscription.create(CONSUMER_NAME, "*.*",
(msg, eventData) -> process(msg.getSubject(), eventData));
natsSubscription.create(CONSUMER_NAME, "*.*", (msg, eventData) -> process(msg.getSubject(), eventData));
}

private void process(String key, EventData eventData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import org.goafabric.eventdispatcher.producer.EventData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
Expand All @@ -17,7 +18,7 @@ public class NatsSubscription {
private final Connection natsConnection;
private final ObjectMapper objectMapper;

public NatsSubscription(Connection natsConnection) {
public NatsSubscription(@Autowired(required = false) Connection natsConnection) {
this.natsConnection = natsConnection;
this.objectMapper = new ObjectMapper(new CBORFactory()); //binary serializer for performance
}
Expand All @@ -26,9 +27,11 @@ public NatsSubscription(Connection natsConnection) {
//autoAck will automatically remove from queue when delivered, when renaming the consumer, it will just create a new one !
public void create(String consumerName, String subject, EventMessageHandler handler ) {
try {
natsConnection.jetStream().subscribe(subject, natsConnection.createDispatcher(),
msg -> handler.onMessage(msg, getEvent(msg.getData())),
true, createDurableOptions(consumerName, subject));
if (natsConnection != null) {
natsConnection.jetStream().subscribe(subject, natsConnection.createDispatcher(),
msg -> handler.onMessage(msg, getEvent(msg.getData())),
true, createDurableOptions(consumerName, subject));
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ nats:
server: "localhost:4222"

#enable kafka or rabbitmq
spring.profiles.active: nats #kafka
spring.profiles.active: kafka #kafka

#logging
logging.pattern.level: "%5p tenantId=%X{tenantId:-}"
Expand Down

0 comments on commit 96a7de5

Please sign in to comment.