diff --git a/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsCalendarConsumer.java b/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsCalendarConsumer.java index ce106bb..3312335 100644 --- a/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsCalendarConsumer.java +++ b/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsCalendarConsumer.java @@ -12,13 +12,9 @@ public class NatsCalendarConsumer { private final Logger log = LoggerFactory.getLogger(this.getClass()); private static final String CONSUMER_NAME = "Calendar"; - public NatsCalendarConsumer(NatsSubscription natsSubscription) { - natsSubscription.create(CONSUMER_NAME, "patient.*", - (msg, eventData) -> process(msg.getSubject(), eventData)); - - natsSubscription.create(CONSUMER_NAME, "practitioner.*", - (msg, eventData) -> process(msg.getSubject(), eventData)); + natsSubscription.create(CONSUMER_NAME, "patient.*", (msg, eventData) -> process(msg.getSubject(), eventData)); + natsSubscription.create(CONSUMER_NAME, "practitioner.*", (msg, eventData) -> process(msg.getSubject(), eventData)); } private void process(String key, EventData eventData) { diff --git a/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsLoggerConsumer.java b/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsLoggerConsumer.java index 0d6aa4b..32b3cbc 100644 --- a/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsLoggerConsumer.java +++ b/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsLoggerConsumer.java @@ -13,8 +13,7 @@ public class NatsLoggerConsumer { private static final String CONSUMER_NAME = "Logger"; public NatsLoggerConsumer(NatsSubscription natsSubscription) { - natsSubscription.create(CONSUMER_NAME, "*.*", - (msg, eventData) -> process(msg.getSubject(), eventData)); + natsSubscription.create(CONSUMER_NAME, "*.*", (msg, eventData) -> process(msg.getSubject(), eventData)); } private void process(String key, EventData eventData) { diff --git a/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsSubscription.java b/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsSubscription.java index f83ead6..3237802 100644 --- a/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsSubscription.java +++ b/src/main/java/org/goafabric/eventdispatcher/consumer/nats/NatsSubscription.java @@ -7,6 +7,7 @@ import io.nats.client.PushSubscribeOptions; import io.nats.client.api.ConsumerConfiguration; import org.goafabric.eventdispatcher.producer.EventData; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; @@ -17,7 +18,7 @@ public class NatsSubscription { private final Connection natsConnection; private final ObjectMapper objectMapper; - public NatsSubscription(Connection natsConnection) { + public NatsSubscription(@Autowired(required = false) Connection natsConnection) { this.natsConnection = natsConnection; this.objectMapper = new ObjectMapper(new CBORFactory()); //binary serializer for performance } @@ -26,9 +27,11 @@ public NatsSubscription(Connection natsConnection) { //autoAck will automatically remove from queue when delivered, when renaming the consumer, it will just create a new one ! public void create(String consumerName, String subject, EventMessageHandler handler ) { try { - natsConnection.jetStream().subscribe(subject, natsConnection.createDispatcher(), - msg -> handler.onMessage(msg, getEvent(msg.getData())), - true, createDurableOptions(consumerName, subject)); + if (natsConnection != null) { + natsConnection.jetStream().subscribe(subject, natsConnection.createDispatcher(), + msg -> handler.onMessage(msg, getEvent(msg.getData())), + true, createDurableOptions(consumerName, subject)); + } } catch (Exception e) { throw new IllegalStateException(e); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index faf8eed..daf3a7c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -39,7 +39,7 @@ nats: server: "localhost:4222" #enable kafka or rabbitmq -spring.profiles.active: nats #kafka +spring.profiles.active: kafka #kafka #logging logging.pattern.level: "%5p tenantId=%X{tenantId:-}"