-
Notifications
You must be signed in to change notification settings - Fork 564
Messaging redelivery
MicroProfile Reactive Messaging 1.0 defines Message.ack()
method for
acknowledging producer/upstream that the message has been successfully consumed.
Messaging methods supports 4 ack strategies
-
MANUAL
- you have to callack()
manually -
PRE-PROCESSING
- message gets acked before your messaging method is invoked -
POST-PROCESSING
- message gets acked before your messaging method is successfully invoked -
NONE
- no ack is going to be performed
In case of the catastrophic scenario 💥 when the message consumption wasn't successful, redelivery can be achieved ... by restarting your POD. Kubernetes will do that for you if your POD crashed with health-check probe. From the Helidon side you need to add messaging health check:
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging-health</artifactId>
</dependency>
It will add another check to the health check reported by your app:
{
"name": "messaging",
"state": "UP",
"status": "UP",
"data": {
"my-channel-1": "UP",
"my-channel-2": "UP"
}
}
When your messaging channel dies/completes(uncaught exception, errored reactive stream), health check is going to report overall health as DOWN and K8s does the restart for you as it considers your POD as crashed.
@Incoming("fromKafka")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)// Message gets acked if method is invoked successfully
public void receiveMessage(String payload) {
if("exception".equals(payload)) {
throw new RuntimeException("Kill the stream!");// Uncaught exception kills the messaging channel
}
}
Messaging works with various connectors each of them maps ack call to different underlying client differently.
For Kafka .ack()
method call actually commits offset of the consumed message, when you are calling .ack()
on the message number 9.
, your are actually saying, Hey Kafka I got all the messages till the number 9. including
, even if you didn't acked number 8.
it's going to be acked also. Since acking in reactive messaging is asynchronous, there is no "right time" to seek backward on the partition offset and achieve redelivery. We also can't continue with next messages without committing offset further. Clean way out is to kill the stream and restart the POD.
When the redelivery is needed, order of consumption is not required and no catastrophic scenario is happening, you can re-enqueue your message to the queue again with your own logic. Its seems weird but you can keep your own redelivery count within the message and have your own logic based on that. Using the error queues is also a battle proven practice:
private final SubmissionPublisher<Message<CloudEvent>> errorQueueEmitter = new SubmissionPublisher<>();
@Incoming("fromKafka")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consume(Message<CloudEvent> message) {
CloudEvent event = message.getPayload();
if (isPayloadValid(event)) {
// do some business here
return message.ack();
} else {
// send it to the error queue
errorQueueEmitter.submit(Message.of(
CloudEventBuilder.from(event)
.withExtension("cause", "invalid payload") // add some extra metadata
.build(),
message::ack // Ack incoming message after error queue broker acks reception
));
return CompletableFuture.completedStage(null);
}
}
@Outgoing("to-error-queue")
public PublisherBuilder<Message<CloudEvent>> errorQueuePub() {
return ReactiveStreams
.fromPublisher(FlowAdapters.toPublisher(errorQueueEmitter));
}