Skip to content

Commit

Permalink
Stop unintentionally auto-completing user-tasks in transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanpelikan committed Jun 4, 2024
1 parent e5eb543 commit 0f96641
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package io.vanillabp.camunda8.service;

import io.vanillabp.spi.service.TaskException;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.interceptor.TransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.interceptor.TransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.transaction.support.TransactionSynchronizationManager;

public class Camunda8TransactionInterceptor extends TransactionInterceptor {

Expand All @@ -31,7 +30,7 @@ public static class TaskHandlerActions {
public Map.Entry<Runnable, Supplier<String>> testForTaskAlreadyCompletedOrCancelledCommand;
public Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand;
public Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand;
public Map.Entry<Runnable, Supplier<String>> handlerCompletedCommand;
public Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand;
}

@Override
Expand All @@ -54,11 +53,14 @@ protected Object invokeWithinTransaction(
actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
}
if (actions.get().handlerCompletedCommand != null) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
Camunda8TransactionInterceptor.class,
actions.get().handlerCompletedCommand.getKey(),
actions.get().handlerCompletedCommand.getValue()));
final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get();
if (handlerCompletedCommand != null) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
Camunda8TransactionInterceptor.class,
handlerCompletedCommand.getKey(),
handlerCompletedCommand.getValue()));
}
}
return result;
} catch (TaskException taskError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import io.camunda.zeebe.client.api.command.ClientStatusException;
import io.grpc.Status;
import io.vanillabp.spi.service.TaskException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
Expand All @@ -11,11 +15,6 @@
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class Camunda8TransactionProcessor {

private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionProcessor.class);
Expand All @@ -24,7 +23,7 @@ public static void registerCallbacks(
final Map.Entry<Runnable, Supplier<String>> testForTaskAlreadyCompletedOrCancelledCommand,
final Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand,
final Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand,
final Map.Entry<Runnable, Supplier<String>> handlerCompletedCommand) {
final Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand) {

final var actions = Camunda8TransactionInterceptor.actions.get();
actions.testForTaskAlreadyCompletedOrCancelledCommand = testForTaskAlreadyCompletedOrCancelledCommand;
Expand Down Expand Up @@ -57,7 +56,8 @@ public static Map.Entry<Runnable, Supplier<String>> handlerCompletedCommandCallb
return Camunda8TransactionInterceptor
.actions
.get()
.handlerCompletedCommand;
.handlerCompletedCommand
.get();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
import io.vanillabp.springboot.adapter.TaskHandlerBase;
import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache;
import io.vanillabp.springboot.parameters.MethodParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.repository.CrudRepository;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
Expand All @@ -27,6 +23,9 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.repository.CrudRepository;

public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler, Consumer<ZeebeClient> {

Expand Down Expand Up @@ -88,11 +87,21 @@ public void handle(
final var taskIdRetrieved = new AtomicBoolean(false);
final var workflowAggregateCache = new WorkflowAggregateCache();

// Any callback used in this method is executed in case of no active transaction.
// In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor.
Camunda8TransactionProcessor.registerCallbacks(
doTestForTaskWasCompletedOrCancelled(job),
doThrowError(client, job, workflowAggregateCache),
doFailed(client, job),
doComplete(client, job, workflowAggregateCache));
() -> {
if (taskType == Type.USERTASK) {
return null;
}
if (taskIdRetrieved.get()) { // async processing of service-task
return null;
}
return doComplete(client, job, workflowAggregateCache);
});

final Function<String, Object> multiInstanceSupplier
= multiInstanceVariable -> getVariable(job, multiInstanceVariable);
Expand Down Expand Up @@ -140,13 +149,10 @@ public void handle(
return workflowAggregateCache.workflowAggregate;
}, multiInstanceSupplier));

if ((taskType != Type.USERTASK)
&& !taskIdRetrieved.get()) {
final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback();
if (callback != null) {
jobPostAction = callback.getKey();
description = callback.getValue();
}
final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback();
if (callback != null) {
jobPostAction = callback.getKey();
description = callback.getValue();
}
} catch (TaskException bpmnError) {
final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback();
Expand Down

0 comments on commit 0f96641

Please sign in to comment.