-
Notifications
You must be signed in to change notification settings - Fork 564
Messaging blocking or long running tasks
Daniel Kec edited this page Jul 28, 2022
·
1 revision
Generally blocking in Reactive messaging is a bad thing. Thread invoking the messaging method can be a worker from event loop and we can risk deadlocks or degraded overall performance. We don't need to however spin off our own threads.
MicroProfile Reactive Messaging integrates with MicroProfile Fault Tolerance so it is possible to use @Asynchronous
for offloading
blocking tasks from upstream threads.
@Incoming("fromKafka")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Asynchronous
@Timeout(4000)
@Fallback(fallbackMethod = "toDLQ", applyOn = {RuntimeException.class, IllegalStateException.class})
public CompletionStage<Void> receiveMessage(Message<String> msg) {
System.out.println("Consuming " + msg.getPayload());
if ("3".equals(msg.getPayload())) throw new RuntimeException("BOOM!");
System.out.println("Consumed " + msg.getPayload() + " successfully!");
return msg.ack();
}
CompletionStage<Void> toDLQ(Message<String> msg) {
System.out.println("Sending " + msg.getPayload() + " to DLQ");
publisher.emit(msg); // Don't ack manually, outgoing connector does that when broker acks reception
return CompletableFuture.completedFuture(null);
}