Note
|
This repository contains the guide documentation source. To view the guide in published form, view it on the Open Liberty website. |
Learn how to use MicroProfile Reactive Messaging acknowledgement strategies to acknowledge messages.
You will learn how to acknowledge messages using MicroProfile Reactive Messaging acknowledgement. You’ll also learn how this works with Apache Kafka.
Temporal decoupling of services in a microservice-based architecture can be achieved using asynchronous communication. MicroProfile Reactive Messaging provides a way to build systems of microservices advocating responsiveness, location transparency, elasticity, resiliency to failure and temporal decoupling. This helps in enforcing asynchronous message passing between the different parts of the system. See MicroProfile Reactive Messaging Specification for more information.
To learn more about the MicroProfile Reactive Messaging, read Creating asynchronous reactive microservices using MicroProfile Reactive Messaging.
MicroProfile Reactive Messaging Acknowledgement ensures that in case of any system failure, no messages are lost. Messaging system like Apache Kafka differentiates between when a message is being delivered to a system and the processing of that message by that system being complete. If the system goes down, any messages which have been sent but not processed completely can then be sent again. Reactive messaging applications implement this using message acknowledgements.
The OpenLiberty Cafe application that you’ll be working with consists of six microservices : Bar
, Kitchen
, Order
, OpenLibertyCafe
, ServingWindow
, and Status
. It also uses Kafka to enable reactive messaging communication between the Producer and Consumer microservices over the messaging channels.
You’ll update the Bar
, Kitchen
, and Status
microservices to use MicroProfile Reactive Messaging acknowledgement strategies for message acknowledgement. These microservices run on Open Liberty.
The OpenLibertyCafe
microservice is a backend for frontend service.
It communicates with the backend microservices on the caller’s behalf.
The waitstaff places a request using the OpenLibertyCafe
microservice.
The Order
microservice consumes the request, produces order messages, and sends them to Kafka on the food
or beverage
channel depending on the type.
An order begins with a NEW
status. The Bar
and Kitchen
microservices illustrate how to handle the orders as a real cafe. They consume and process the order and update the status to IN_PROGRESS
and READY
consecutively. There’s a sleep operation in between each status to represent the order processing time, and the status updates are reported back to the Status
microservice via reactive messages on the Kafka.
The ServingWindow
microservice illustrates when the food and beverages are ready to be served. It consumes the READY
order from the Kitchen
and Bar
microservices. Once the order is served, the waitstaff can mark the order as COMPLETED
and the status is sent back to the Status
microservice as a message.
The Status
microservice receives status updates and provides queries for the waitstaff. All the orders are persisted here. Because persisting data is not the focus of this guide, the Status
microservice simply uses a Map object to store the data.
You will now build and run the microservices in Docker containers. You can learn more about containerizing microservices with Docker in the Containerizing microservices guide.
Install Docker by following the instructions on the official Docker documentation. Start your Docker environment.
MicroProfile Reactive Messaging acknowledgement consists of mainly four strategies namely NONE
, PRE_PROCESSING
, POST_PROCESSING
and MANUAL
.
Acknowledgement is an important part of message processing. Messages are either acknowledged explicitly, or implicitly by the MicroProfile Reactive Messaging implementation.
@Acknowledgement
for the @Incoming
messages is controlled by the org.eclipse.microprofile.reactive.messaging.Acknowledgment
annotation.
Default acknowledgement depends on the method signature. Each method signature implements different acknowledgement policies. If the Acknowledgment annotation is not set, the default policy is applied.
Different defaults and supported acknowledgement for each supported signature could be found at the Messaging acknowledgement table of the specification.
Methods annotated with only @Outgoing
do not support acknowledgement as they don’t receive any incoming Message
.
If the acknowledgement strategy is set to NONE
, no acknowledgment will be performed. The NONE
strategy indicates that the incoming message is not acknowledged and the acknowledgment of the outgoing message would not acknowledge the incoming message anymore. The NONE
strategy may be used for the protocols that do not support message acknowledgment.
If the acknowledgement strategy is set to PRE_PROCESSING
, the Reactive Messaging implementation acknowledges the message before the annotated method or processing is executed.
If an @Incoming
annotated method has an acknowledgement strategy defined as @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
, the Reactive Messaging implementation is responsible for the acknowledgement of the message.
Replace theBarService
class.bar/src/main/java/io/openliberty/guides/bar/BarService.java
BarService.java
link:finish/bar/src/main/java/io/openliberty/guides/bar/BarService.java[role=include]
If the order is an item of drink, Bar
service receives an order on the @Incoming("beverageOrderConsume")
channel, processes it in receiveBeverageOrder()
method and sends back an IN_PROGRESS
status on the @Outgoing("beverageOrderPublishStatus")
channel to Kafka over MicroProfile Reactive Messaging channels. After a while, a READY
status will be sent to the @Outgoing("beverageOrderPublishStatus")
channel.
The receiveBeverageOrder()
method in Bar microservice matches the following signature.
@Incoming("in")
@Outgoing("out")
O method(I payload)
Based on the table mentioned in the specification , the default acknowledgement strategy would be POST_PROCESSING
. It means that the messages will be acknowledged when the annotated method executes completely. However when we change the strategy to PRE_PROCESSING
, the incoming message will be acknowledged even before the annotated method is called. It also means that the acknowledgment of the outgoing message would not acknowledge the incoming message anymore, as it’s already acknowledged. In case the service goes down when the message is being processed, the Kafka won’t resend the message if the service is restored as the Kafka already processed the message.
If the acknowledgement strategy is set to POST_PROCESSING
, the Reactive Messaging implementation acknowledges the message after the annotated method or processing is executed. When dealing with payloads, the POST_PROCESSING
strategy is the default strategy.
If an @Incoming
annotated method has an acknowledgement strategy defined as @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
, the Reactive Messaging implementation is responsible for the acknowledgement of the message.
Replace theKitchenService
class.kitchen/src/main/java/io/openliberty/guides/kitchen/KitchenService.java
KitchenService.java
link:finish/kitchen/src/main/java/io/openliberty/guides/kitchen/KitchenService.java[role=include]
If the order is an item of food, Kitchen
service receives an order on the @Incoming("foodOrderConsume")
channel, processes it in receiveFoodOrder()
method and sends back an IN_PROGRESS
status on the @Outgoing("foodOrderPublishStatus")
channel to Kafka over MicroProfile Reactive Messaging channels. After a while, a READY
status will be sent to the @Outgoing("foodOrderPublishStatus")
channel.
The receiveFoodOrder()
method in Kitchen microservice annotated with POST_PROCESSING
acknowledgement strategy indicates that the incoming message is acknowledged when the outgoing message is acknowledged. i.e. when the emitted data is acknowledged. In case the service goes down when the message is being processed, the Kafka will resend the message and the message will be reprocessed.
Check the specification to see whether reactive messaging does PRE_PROCESSING
or POST_PROCESSING
message acknowledgements for your method. If it does PRE_PROCESSING
consider changing to POST_PROCESSING
based on your application need.
If the acknowledgement strategy is set to MANUAL
, a user or a developer is responsible for the acknowledgement. It can be achieved by calling the Message.ack()
method, so that the Reactive Messaging implementation does not apply implicit acknowledgement.
Replace theStatusResource
class.status/src/main/java/io/openliberty/guides/status/StatusResource.java
StatusResource.java
link:finish/status/src/main/java/io/openliberty/guides/status/StatusResource.java[role=include]
status/microprofile-config.properties
link:finish/status/src/main/resources/META-INF/microprofile-config.properties[role=include]
The Status
microservice receives the message from the Bar
, Kitchen
, Order
, and ServingWindow
microservices over the @Incoming("updateStatus")
channel with its properties defined in the microprofile-config.properties
.
The updateStatus()
method in Status microservice annotated with MANUAL
acknowledgement strategy
indicates that the incoming message acknowledgement is managed by the user code. The order.ack()
method acknowledges the message once the method completes all it’s processing.
For the MANUAL
acknowledgement, transiting data needs to be wrapped in a Message
. The Message
class provides metadata and also allows messages to be acknowledged.
You will build and run the Bar
, Kitchen
, Order
, OpenLibertyCafe
, ServingWindow
, and Status
microservices in Docker containers.
Start your Docker environment.
To build the application, run the Maven install
and package
goals from the command line in the start
directory:
mvn -pl models install
mvn package
Run the following command to download or update the open-liberty
docker image to latest.
docker pull open-liberty
Run the following commands to containerize the microservices:
docker build -t bar:1.0-SNAPSHOT bar/.
docker build -t kitchen:1.0-SNAPSHOT kitchen/.
docker build -t openlibertycafe:1.0-SNAPSHOT openLibertyCafe/.
docker build -t order:1.0-SNAPSHOT order/.
docker build -t servingwindow:1.0-SNAPSHOT servingWindow/.
docker build -t status:1.0-SNAPSHOT status/.
Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It also creates containers for Kafka, Zookeeper, and all of the microservices in the project.
./scripts/startContainers.sh
.\scripts\startContainers.bat
Once the application is up and running, you can access the application by making requests to the OpenLibertyCafe
endpoint using OpenAPI.
The services take some time to become available. Check out the service that you created at the http://localhost:9080/openapi/ui URL.
Place Orders
Make a POST
request to the /api/orders
endpoint. To make this request, expand the POST
endpoint on the UI, click the Try it out button. Copy the following example input into the text box, and then click the Execute
button. The POST
request adds a new food
and beverage
Order.
{
"tableId": "1",
"foodList": [
"burger"
],
"beverageList": [
"coke"
]
}
Check IN_PROGRESS Order Status
Next, make a GET
request to the /api/status
endpoint. To make this request, expand the GET
endpoint on the UI, click the Try it out button, and then click the Execute
button. The GET
request returns information about the Orders. The response should show that the Orders have an IN_PROGRESS
status. Just in case if you see a NEW
status then wait for a couple of seconds and click the Execute
button again.
[
{
"item": "burger",
"orderId": "0001",
"status": "IN_PROGRESS",
"tableId": "1",
"type": "FOOD"
},
{
"item": "coke",
"orderId": "0002",
"status": "IN_PROGRESS",
"tableId": "1",
"type": "BEVERAGE"
}
]
Check READY Order Status
Click Execute
again and you will see the response with a READY
status. Just in case if you see an IN_PROGRESS
status then wait for a couple of seconds and click the Execute
button again.
[
{
"item": "burger",
"orderId": "0001",
"status": "READY",
"tableId": "1",
"type": "FOOD"
},
{
"item": "coke",
"orderId": "0002",
"status": "READY",
"tableId": "1",
"type": "BEVERAGE"
}
]
Complete an Order
Make a POST
request to the /api/servingWindow/{orderId}
endpoint to complete an order. Expand the POST
endpoint on the UI, click the Try it out button. Copy the following orderId into the text box, and then click the Execute
button.
0002
The status for the orderId 0002
is changed from READY
to COMPLETED
and the updated status is cascaded to the Status
microservice.
Then, make a GET
request to the /api/status
endpoint again and verify that the status for the order with an orderId
of 0002
is now labelled as COMPLETED
.
[
{
"item": "burger",
"orderId": "0001",
"status": "READY",
"tableId": "1",
"type": "FOOD"
},
{
"item": "coke",
"orderId": "0002",
"status": "COMPLETED",
"tableId": "1",
"type": "BEVERAGE"
}
]
Finally, use the following script to stop the application:
./scripts/stopContainers.sh
.\scripts\stopContainers.bat
You have just developed an application using MicroProfile Reactive Messaging, Open Liberty and Kakfa.