diff --git a/pom.xml b/pom.xml index 75c5317..40c1cb3 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ io.orkes.conductor orkes-conductor-client - 2.0.3 + 2.1.2 diff --git a/src/main/java/io/orkes/example/saga/dao/BaseDAO.java b/src/main/java/io/orkes/example/saga/dao/BaseDAO.java index 6879020..e68821c 100644 --- a/src/main/java/io/orkes/example/saga/dao/BaseDAO.java +++ b/src/main/java/io/orkes/example/saga/dao/BaseDAO.java @@ -50,7 +50,7 @@ public void createTables(String service) { } private void createBookingsTableCreationSqlStmt() { - String sql = "CREATE TABLE bookings (\n" + String sql = "CREATE TABLE IF NOT EXISTS bookings (\n" + " bookingId text PRIMARY KEY,\n" + " riderId integer NOT NULL,\n" + " driverId integer,\n" @@ -64,7 +64,7 @@ private void createBookingsTableCreationSqlStmt() { } private void createRidersTableCreationSqlStmt() { - String sql = "CREATE TABLE riders (\n" + String sql = "CREATE TABLE IF NOT EXISTS riders (\n" + " id integer PRIMARY KEY AUTOINCREMENT,\n" + " name text NOT NULL,\n" + " contact text\n" @@ -76,7 +76,7 @@ private void createRidersTableCreationSqlStmt() { } private void createDriversTableCreationSqlStmt() { - String sql = "CREATE TABLE drivers (\n" + String sql = "CREATE TABLE IF NOT EXISTS drivers (\n" + " id integer PRIMARY KEY AUTOINCREMENT,\n" + " name text NOT NULL,\n" + " contact text\n" @@ -88,7 +88,7 @@ private void createDriversTableCreationSqlStmt() { } private void createAssignmentsTableCreationSqlStmt() { - String sql = "CREATE TABLE assignments (\n" + String sql = "CREATE TABLE IF NOT EXISTS assignments (\n" + " id integer PRIMARY KEY AUTOINCREMENT,\n" + " booking_id text NOT NULL,\n" + " driver_id integer NOT NULL,\n" @@ -100,7 +100,7 @@ private void createAssignmentsTableCreationSqlStmt() { } private void createPaymentMethodsTableCreationSqlStmt() { - String sql = "CREATE TABLE payment_methods (\n" + String sql = "CREATE TABLE IF NOT EXISTS payment_methods (\n" + " id integer PRIMARY KEY AUTOINCREMENT,\n" + " rider_id integer NOT NULL,\n" + " details text NOT NULL\n" @@ -112,7 +112,7 @@ private void createPaymentMethodsTableCreationSqlStmt() { } private void createPaymentsTableCreationSqlStmt() { - String sql = "CREATE TABLE payments (\n" + String sql = "CREATE TABLE IF NOT EXISTS payments (\n" + " payment_id text PRIMARY KEY,\n" + " booking_id text NOT NULL,\n" + " amount number NOT NULL,\n" diff --git a/src/main/java/io/orkes/example/saga/service/CabAssignmentService.java b/src/main/java/io/orkes/example/saga/service/CabAssignmentService.java index 01077c1..0fa264d 100644 --- a/src/main/java/io/orkes/example/saga/service/CabAssignmentService.java +++ b/src/main/java/io/orkes/example/saga/service/CabAssignmentService.java @@ -4,6 +4,7 @@ import io.orkes.example.saga.pojos.Booking; import io.orkes.example.saga.pojos.CabAssignment; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; import java.util.Random; @Slf4j @@ -41,8 +42,9 @@ public static int assignDriver(String bookingId) { public static void cancelAssignment(String bookingId) { Booking booking = BookingService.getBooking(bookingId); - if (booking.getBookingId().isEmpty()) { + if (booking.getBookingId() == null) { log.error("Booking with id {} not found.", bookingId); + return; } cabAssignmentDAO.deactivateAssignment(bookingId); diff --git a/src/main/java/io/orkes/example/saga/service/PaymentService.java b/src/main/java/io/orkes/example/saga/service/PaymentService.java index 6016cc4..7b65b6d 100644 --- a/src/main/java/io/orkes/example/saga/service/PaymentService.java +++ b/src/main/java/io/orkes/example/saga/service/PaymentService.java @@ -37,7 +37,7 @@ public static Payment createPayment(PaymentRequest paymentRequest) { } // Check for payment methods for the rider // If exists, try to make the payment - else if (paymentMethod.getId() > 0) { + else if (paymentMethod.getId() >= 0) { payment.setPaymentMethodId(paymentMethod.getId()); // Call external Payments API if(makePayment(payment)) { diff --git a/src/main/java/io/orkes/example/saga/service/WorkflowService.java b/src/main/java/io/orkes/example/saga/service/WorkflowService.java index 2f5d609..60e83f3 100644 --- a/src/main/java/io/orkes/example/saga/service/WorkflowService.java +++ b/src/main/java/io/orkes/example/saga/service/WorkflowService.java @@ -40,9 +40,7 @@ public Map startRideBookingWorkflow(BookingRequest bookingReques } Map inputData = new HashMap<>(); - inputData.put("riderId", bookingRequest.getRiderId()); - inputData.put("dropOffLocation", bookingRequest.getDropOffLocation()); - inputData.put("pickUpLocation", bookingRequest.getPickUpLocation()); + inputData.put("bookingRequest", bookingRequest); request.setInput(inputData); String workflowId = ""; diff --git a/src/main/java/io/orkes/example/saga/workers/ConductorWorkers.java b/src/main/java/io/orkes/example/saga/workers/ConductorWorkers.java index fb6139d..6ed116e 100644 --- a/src/main/java/io/orkes/example/saga/workers/ConductorWorkers.java +++ b/src/main/java/io/orkes/example/saga/workers/ConductorWorkers.java @@ -1,6 +1,7 @@ package io.orkes.example.saga.workers; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.sdk.workflow.task.InputParam; import com.netflix.conductor.sdk.workflow.task.WorkerTask; import io.orkes.example.saga.pojos.*; import io.orkes.example.saga.service.BookingService; @@ -21,7 +22,7 @@ public class ConductorWorkers { * Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms */ @WorkerTask(value = "book_ride", threadCount = 3, pollingInterval = 300) - public TaskResult checkForBookingRideTask(BookingRequest bookingRequest) { + public TaskResult checkForBookingRideTask(@InputParam("bookingRequest") BookingRequest bookingRequest) { String bookingId = BookingService.createBooking(bookingRequest); TaskResult result = new TaskResult(); diff --git a/src/main/resources/cab_service_saga_booking_wf.json b/src/main/resources/cab_service_saga_booking_wf.json index 308a991..bf3fb0e 100644 --- a/src/main/resources/cab_service_saga_booking_wf.json +++ b/src/main/resources/cab_service_saga_booking_wf.json @@ -7,9 +7,7 @@ "name": "book_ride", "taskReferenceName": "book_ride_ref", "inputParameters": { - "riderId": "${workflow.input.riderId}", - "pickUpLocation": "${workflow.input.pickUpLocation}", - "dropOffLocation": "${workflow.input.dropOffLocation}" + "bookingRequest": "${workflow.input.bookingRequest}" }, "type": "SIMPLE", "decisionCases": {}, @@ -46,7 +44,7 @@ "taskReferenceName": "make_payment_ref", "inputParameters": { "bookingId": "${assign_driver_ref.output.bookingId}", - "riderId": "${workflow.input.riderId}" + "riderId": "${workflow.input.bookingRequest.riderId}" }, "type": "SIMPLE", "decisionCases": {}, @@ -91,11 +89,11 @@ "name": "notify_customer", "taskReferenceName": "notify_customer_ref", "inputParameters": { - "riderId": "${workflow.input.riderId}", + "riderId": "${workflow.input.bookingRequest.riderId}", "driverId": "${assign_driver_ref.output.driverId}", - "dropOffLocation": "${workflow.input.dropOffLocation}", + "dropOffLocation": "${workflow.input.bookingRequest.dropOffLocation}", "bookingId": "${book_ride_ref.output.bookingId}", - "pickUpLocation": "${workflow.input.pickUpLocation}" + "pickUpLocation": "${workflow.input.bookingRequest.pickUpLocation}" }, "type": "SIMPLE", "decisionCases": {}, @@ -117,8 +115,8 @@ "inputParameters": { "bookingId": "${book_ride_ref.output.bookingId}", "driverId": "${assign_driver_ref.output.driverId}", - "pickUp": "${workflow.input.pickUpLocation}", - "dropOff": "${workflow.input.dropOffLocation}" + "pickUp": "${workflow.input.bookingRequest.pickUpLocation}", + "dropOff": "${workflow.input.bookingRequest.dropOffLocation}" }, "type": "SIMPLE", "decisionCases": {},