Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

saga pattern example #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>io.orkes.conductor</groupId>
<artifactId>orkes-conductor-client</artifactId>
<version>2.0.3</version>
<version>2.1.2</version>
</dependency>
</dependencies>

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/orkes/example/saga/dao/BaseDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void createTables(String service) {
}

private void createBookingsTableCreationSqlStmt() {
String sql = "CREATE TABLE bookings (\n"
String sql = "CREATE TABLE IF NOT EXISTS bookings (\n"
+ " bookingId text PRIMARY KEY,\n"
+ " riderId integer NOT NULL,\n"
+ " driverId integer,\n"
Expand All @@ -64,7 +64,7 @@ private void createBookingsTableCreationSqlStmt() {
}

private void createRidersTableCreationSqlStmt() {
String sql = "CREATE TABLE riders (\n"
String sql = "CREATE TABLE IF NOT EXISTS riders (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " name text NOT NULL,\n"
+ " contact text\n"
Expand All @@ -76,7 +76,7 @@ private void createRidersTableCreationSqlStmt() {
}

private void createDriversTableCreationSqlStmt() {
String sql = "CREATE TABLE drivers (\n"
String sql = "CREATE TABLE IF NOT EXISTS drivers (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " name text NOT NULL,\n"
+ " contact text\n"
Expand All @@ -88,7 +88,7 @@ private void createDriversTableCreationSqlStmt() {
}

private void createAssignmentsTableCreationSqlStmt() {
String sql = "CREATE TABLE assignments (\n"
String sql = "CREATE TABLE IF NOT EXISTS assignments (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " booking_id text NOT NULL,\n"
+ " driver_id integer NOT NULL,\n"
Expand All @@ -100,7 +100,7 @@ private void createAssignmentsTableCreationSqlStmt() {
}

private void createPaymentMethodsTableCreationSqlStmt() {
String sql = "CREATE TABLE payment_methods (\n"
String sql = "CREATE TABLE IF NOT EXISTS payment_methods (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " rider_id integer NOT NULL,\n"
+ " details text NOT NULL\n"
Expand All @@ -112,7 +112,7 @@ private void createPaymentMethodsTableCreationSqlStmt() {
}

private void createPaymentsTableCreationSqlStmt() {
String sql = "CREATE TABLE payments (\n"
String sql = "CREATE TABLE IF NOT EXISTS payments (\n"
+ " payment_id text PRIMARY KEY,\n"
+ " booking_id text NOT NULL,\n"
+ " amount number NOT NULL,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.orkes.example.saga.pojos.Booking;
import io.orkes.example.saga.pojos.CabAssignment;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.Random;
@Slf4j
Expand Down Expand Up @@ -41,8 +42,9 @@ public static int assignDriver(String bookingId) {
public static void cancelAssignment(String bookingId) {
Booking booking = BookingService.getBooking(bookingId);

if (booking.getBookingId().isEmpty()) {
if (booking.getBookingId() == null) {
log.error("Booking with id {} not found.", bookingId);
return;
}

cabAssignmentDAO.deactivateAssignment(bookingId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static Payment createPayment(PaymentRequest paymentRequest) {
}
// Check for payment methods for the rider
// If exists, try to make the payment
else if (paymentMethod.getId() > 0) {
else if (paymentMethod.getId() >= 0) {
payment.setPaymentMethodId(paymentMethod.getId());
// Call external Payments API
if(makePayment(payment)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public Map<String, Object> startRideBookingWorkflow(BookingRequest bookingReques
}

Map<String, Object> inputData = new HashMap<>();
inputData.put("riderId", bookingRequest.getRiderId());
inputData.put("dropOffLocation", bookingRequest.getDropOffLocation());
inputData.put("pickUpLocation", bookingRequest.getPickUpLocation());
inputData.put("bookingRequest", bookingRequest);
request.setInput(inputData);

String workflowId = "";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.orkes.example.saga.workers;

import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;
import io.orkes.example.saga.pojos.*;
import io.orkes.example.saga.service.BookingService;
Expand All @@ -21,7 +22,7 @@ public class ConductorWorkers {
* Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms
*/
@WorkerTask(value = "book_ride", threadCount = 3, pollingInterval = 300)
public TaskResult checkForBookingRideTask(BookingRequest bookingRequest) {
public TaskResult checkForBookingRideTask(@InputParam("bookingRequest") BookingRequest bookingRequest) {
String bookingId = BookingService.createBooking(bookingRequest);

TaskResult result = new TaskResult();
Expand Down
16 changes: 7 additions & 9 deletions src/main/resources/cab_service_saga_booking_wf.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
"name": "book_ride",
"taskReferenceName": "book_ride_ref",
"inputParameters": {
"riderId": "${workflow.input.riderId}",
"pickUpLocation": "${workflow.input.pickUpLocation}",
"dropOffLocation": "${workflow.input.dropOffLocation}"
"bookingRequest": "${workflow.input.bookingRequest}"
},
"type": "SIMPLE",
"decisionCases": {},
Expand Down Expand Up @@ -46,7 +44,7 @@
"taskReferenceName": "make_payment_ref",
"inputParameters": {
"bookingId": "${assign_driver_ref.output.bookingId}",
"riderId": "${workflow.input.riderId}"
"riderId": "${workflow.input.bookingRequest.riderId}"
},
"type": "SIMPLE",
"decisionCases": {},
Expand Down Expand Up @@ -91,11 +89,11 @@
"name": "notify_customer",
"taskReferenceName": "notify_customer_ref",
"inputParameters": {
"riderId": "${workflow.input.riderId}",
"riderId": "${workflow.input.bookingRequest.riderId}",
"driverId": "${assign_driver_ref.output.driverId}",
"dropOffLocation": "${workflow.input.dropOffLocation}",
"dropOffLocation": "${workflow.input.bookingRequest.dropOffLocation}",
"bookingId": "${book_ride_ref.output.bookingId}",
"pickUpLocation": "${workflow.input.pickUpLocation}"
"pickUpLocation": "${workflow.input.bookingRequest.pickUpLocation}"
},
"type": "SIMPLE",
"decisionCases": {},
Expand All @@ -117,8 +115,8 @@
"inputParameters": {
"bookingId": "${book_ride_ref.output.bookingId}",
"driverId": "${assign_driver_ref.output.driverId}",
"pickUp": "${workflow.input.pickUpLocation}",
"dropOff": "${workflow.input.dropOffLocation}"
"pickUp": "${workflow.input.bookingRequest.pickUpLocation}",
"dropOff": "${workflow.input.bookingRequest.dropOffLocation}"
},
"type": "SIMPLE",
"decisionCases": {},
Expand Down