From 0f96641f6d67e366d6b3efeb146518a54ce4ce12 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Tue, 4 Jun 2024 13:35:49 +0200 Subject: [PATCH] Stop unintentionally auto-completing user-tasks in transactions --- .../Camunda8TransactionInterceptor.java | 26 ++++++++-------- .../service/Camunda8TransactionProcessor.java | 14 ++++----- .../camunda8/wiring/Camunda8TaskHandler.java | 30 +++++++++++-------- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java index d4ff928..6db8d5c 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java @@ -1,17 +1,16 @@ package io.vanillabp.camunda8.service; import io.vanillabp.spi.service.TaskException; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.transaction.TransactionManager; -import org.springframework.transaction.interceptor.TransactionAttributeSource; -import org.springframework.transaction.interceptor.TransactionInterceptor; -import org.springframework.transaction.support.TransactionSynchronizationManager; - import java.lang.reflect.Method; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.transaction.TransactionManager; +import org.springframework.transaction.interceptor.TransactionAttributeSource; +import org.springframework.transaction.interceptor.TransactionInterceptor; +import org.springframework.transaction.support.TransactionSynchronizationManager; public class Camunda8TransactionInterceptor extends TransactionInterceptor { @@ -31,7 +30,7 @@ public static class TaskHandlerActions { public Map.Entry> testForTaskAlreadyCompletedOrCancelledCommand; public Map.Entry, Function> bpmnErrorCommand; public Map.Entry, Function> handlerFailedCommand; - public Map.Entry> handlerCompletedCommand; + public Supplier>> handlerCompletedCommand; } @Override @@ -54,11 +53,14 @@ protected Object invokeWithinTransaction( actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); } if (actions.get().handlerCompletedCommand != null) { - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - Camunda8TransactionInterceptor.class, - actions.get().handlerCompletedCommand.getKey(), - actions.get().handlerCompletedCommand.getValue())); + final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get(); + if (handlerCompletedCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + Camunda8TransactionInterceptor.class, + handlerCompletedCommand.getKey(), + handlerCompletedCommand.getValue())); + } } return result; } catch (TaskException taskError) { diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java index 7475240..068bd92 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java @@ -3,6 +3,10 @@ import io.camunda.zeebe.client.api.command.ClientStatusException; import io.grpc.Status; import io.vanillabp.spi.service.TaskException; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEvent; @@ -11,11 +15,6 @@ import org.springframework.transaction.interceptor.TransactionAspectSupport; import org.springframework.transaction.support.TransactionSynchronizationManager; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - public class Camunda8TransactionProcessor { private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionProcessor.class); @@ -24,7 +23,7 @@ public static void registerCallbacks( final Map.Entry> testForTaskAlreadyCompletedOrCancelledCommand, final Map.Entry, Function> bpmnErrorCommand, final Map.Entry, Function> handlerFailedCommand, - final Map.Entry> handlerCompletedCommand) { + final Supplier>> handlerCompletedCommand) { final var actions = Camunda8TransactionInterceptor.actions.get(); actions.testForTaskAlreadyCompletedOrCancelledCommand = testForTaskAlreadyCompletedOrCancelledCommand; @@ -57,7 +56,8 @@ public static Map.Entry> handlerCompletedCommandCallb return Camunda8TransactionInterceptor .actions .get() - .handlerCompletedCommand; + .handlerCompletedCommand + .get(); } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java index 848d27f..17ac882 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java @@ -14,10 +14,6 @@ import io.vanillabp.springboot.adapter.TaskHandlerBase; import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache; import io.vanillabp.springboot.parameters.MethodParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.repository.CrudRepository; - import java.lang.reflect.Method; import java.time.Duration; import java.util.List; @@ -27,6 +23,9 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.repository.CrudRepository; public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler, Consumer { @@ -88,11 +87,21 @@ public void handle( final var taskIdRetrieved = new AtomicBoolean(false); final var workflowAggregateCache = new WorkflowAggregateCache(); + // Any callback used in this method is executed in case of no active transaction. + // In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor. Camunda8TransactionProcessor.registerCallbacks( doTestForTaskWasCompletedOrCancelled(job), doThrowError(client, job, workflowAggregateCache), doFailed(client, job), - doComplete(client, job, workflowAggregateCache)); + () -> { + if (taskType == Type.USERTASK) { + return null; + } + if (taskIdRetrieved.get()) { // async processing of service-task + return null; + } + return doComplete(client, job, workflowAggregateCache); + }); final Function multiInstanceSupplier = multiInstanceVariable -> getVariable(job, multiInstanceVariable); @@ -140,13 +149,10 @@ public void handle( return workflowAggregateCache.workflowAggregate; }, multiInstanceSupplier)); - if ((taskType != Type.USERTASK) - && !taskIdRetrieved.get()) { - final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); - if (callback != null) { - jobPostAction = callback.getKey(); - description = callback.getValue(); - } + final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); + if (callback != null) { + jobPostAction = callback.getKey(); + description = callback.getValue(); } } catch (TaskException bpmnError) { final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback();