Skip to content

Commit

Permalink
improve code according to proposal
Browse files Browse the repository at this point in the history
Signed-off-by: Sky Ao <[email protected]>
  • Loading branch information
skyao committed Nov 21, 2023
1 parent 6bdf2cd commit f91144a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaConfiguration;
import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOption;

public class OrderProcessingWorkflow extends Workflow {

Expand All @@ -33,10 +34,10 @@ public WorkflowStub create() {
logger.info("Instance ID(order ID): " + orderId);
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());

SagaConfiguration config = SagaConfiguration.newBuilder()
SagaOption option = SagaOption.newBuilder()
.setParallelCompensation(false)
.setContinueWithError(true).build();
Saga saga = new Saga(config);
Saga saga = new Saga(option);

OrderPayload order = ctx.getInput(OrderPayload.class);
logger.info("Received Order: " + order.toString());
Expand Down Expand Up @@ -95,7 +96,7 @@ public WorkflowStub create() {
return;
}
// payment activity is processed, register for compensation
saga.registerCompensation(ProcessPaymentActivity.class.getName(), paymentRequest, isOK);
saga.registerCompensation(ProcessPaymentActivity.class.getName(), paymentRequest);

// step5: Update the inventory (need compensation)
inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(),
Expand All @@ -104,12 +105,15 @@ public WorkflowStub create() {
// Let users know their payment processing failed
notification.setMessage("Order failed to update inventory! : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();

// throw exception to trigger compensation
throw new RuntimeException("Failed to update inventory");

// trigger saga compensation gracefully
saga.compensate();
orderResult.setCompensated(true);
ctx.complete(orderResult);
return;
}
// Update Inventory activity is succeed, register for compensation
saga.registerCompensation(UpdateInventoryActivity.class.getName(), inventoryRequest, inventoryResult);
saga.registerCompensation(UpdateInventoryActivity.class.getName(), inventoryRequest);

// step6: delevery (allways be failed to trigger compensation)
ctx.callActivity(DeliveryActivity.class.getName(), null).await();
Expand All @@ -124,13 +128,17 @@ public WorkflowStub create() {
orderResult.setProcessed(true);
ctx.complete(orderResult);
} catch (OrchestratorBlockedException e) {
//TODO: try to improve design and remove this exception catch
throw e;
} catch (SagaCompensationException e) {
// Saga compensation is triggered gracefully but failed
// don't need to trigger compensation again
throw e;
} catch (Exception e) {
// trigger saga compensation on exception
saga.compensate();

orderResult.setCompensated(true);
ctx.complete(orderResult);

saga.compensate();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void main(String[] args) throws Exception {
System.out.println("*** Welcome to the Dapr saga console app sample!");
System.out.println("*** Using this app, you can place orders that start workflows.");
// Wait for the sidecar to become available
Thread.sleep(5 * 1000);
Thread.sleep(2 * 1000);

// Register the OrderProcessingWorkflow and its activities with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
Expand Down Expand Up @@ -116,8 +116,18 @@ private static InventoryItem prepareInventoryAndOrder() {
inventory.setName("cars");
inventory.setPerItemCost(15000);
inventory.setQuantity(100);
DaprClient daprClient = new DaprClientBuilder().build();
restockInventory(daprClient, inventory);

DaprClient daprClient = null;
try {
daprClient = new DaprClientBuilder().build();
restockInventory(daprClient, inventory);
} finally {
try {
daprClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}

// prepare order for 10 cars
InventoryItem order = new InventoryItem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Object run(WorkflowActivityContext ctx) {
}

@Override
public void compensate(Object activityInput, Object activityOutput) {
public void compensate(Object activityInput) {
PaymentRequest input = (PaymentRequest) activityInput;

logger.info("Compensating payment for request ID '{}' at ${}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Object run(WorkflowActivityContext ctx) {
}

@Override
public void compensate(Object activityInput, Object activityOutput) {
public void compensate(Object activityInput) {
InventoryRequest inventoryRequest = (InventoryRequest) activityInput;
logger.info("Compensating inventory for order '{}' of {} {}",
inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName());
Expand Down

0 comments on commit f91144a

Please sign in to comment.