Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Mautsch committed Jul 5, 2024
1 parent 47f8731 commit d8e99f6
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ public class NatsCalendarConsumer {
private static final String CONSUMER_NAME = "Calendar";


public NatsCalendarConsumer(ConsumerUtil consumerUtil) {
consumerUtil.subscribe(CONSUMER_NAME, "patient.*",
public NatsCalendarConsumer(NatsSubscription natsSubscription) {
natsSubscription.create(CONSUMER_NAME, "patient.*",
(msg, eventData) -> process(msg.getSubject(), eventData));

consumerUtil.subscribe(CONSUMER_NAME, "practitioner.*",
natsSubscription.create(CONSUMER_NAME, "practitioner.*",
(msg, eventData) -> process(msg.getSubject(), eventData));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public class NatsLoggerConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final String CONSUMER_NAME = "Logger";

public NatsLoggerConsumer(ConsumerUtil consumerUtil) {
consumerUtil.subscribe(CONSUMER_NAME, "*.*",
public NatsLoggerConsumer(NatsSubscription natsSubscription) {
natsSubscription.create(CONSUMER_NAME, "*.*",
(msg, eventData) -> process(msg.getSubject(), eventData));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@


@Component
public class ConsumerUtil {
public class NatsSubscription {
private final Connection natsConnection;
private final ObjectMapper objectMapper;

public ConsumerUtil(Connection natsConnection) {
public NatsSubscription(Connection natsConnection) {
this.natsConnection = natsConnection;
this.objectMapper = new ObjectMapper(new CBORFactory()); //binary serializer for performance
}

//creates a durable consumer which msg being delivered even if consumer is not running,
//autoAck will automatically remove from queue when delivered, when renaming the consumer, it will just create a new one !
public void subscribe(String consumerName, String subject, EventMessageHandler handler ) {
public void create(String consumerName, String subject, EventMessageHandler handler ) {
try {
natsConnection.jetStream().subscribe(subject, natsConnection.createDispatcher(),
msg -> handler.onMessage(msg, getEvent(msg.getData())),
Expand Down

0 comments on commit d8e99f6

Please sign in to comment.