From c9e33a25260a9a13b8f147fcaaaffe9c077da0ec Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> Date: Tue, 27 Feb 2024 11:58:32 +0100 Subject: [PATCH] [KOGITO-9276] Adding support for businessKey to resume a process (#3412) * [KOGITO-9276] Adding support for businnesKey to resume a process * [KOGITO-9276] Cleaner alternative * [KOGITO-9276] Gonzalos comments --- .../kie/kogito/process/ProcessInstances.java | 4 + .../event/impl/ProcessEventDispatcher.java | 82 ++++++++++++------- .../quarkus/workflows/AssuredTestUtils.java | 33 ++++++-- .../kogito/quarkus/workflows/EventFlowIT.java | 19 ++++- 4 files changed, 94 insertions(+), 44 deletions(-) diff --git a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java index d6233541d45..6cc0d95649c 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java @@ -29,6 +29,10 @@ default Optional> findById(String id) { Optional> findById(String id, ProcessInstanceReadMode mode); + default Optional> findByBusinessKey(String id) { + return stream().filter(pi -> id.equals(pi.businessKey())).findAny(); + } + Stream> stream(ProcessInstanceReadMode mode); default Stream> stream() { diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java index 48d46a04fd1..704e854865c 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java @@ -37,7 +37,6 @@ import org.kie.kogito.correlation.SimpleCorrelation; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventDispatcher; -import org.kie.kogito.internal.utils.ConversionUtils; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; import org.kie.kogito.process.ProcessService; @@ -74,20 +73,52 @@ public CompletableFuture> dispatch(String trigger, DataEvent< } return CompletableFuture.completedFuture(null); } + return resolveCorrelationId(event) + .map(kogitoReferenceId -> asCompletable(trigger, event, findById(kogitoReferenceId))) + .orElseGet(() -> { + // check processInstanceId + String processInstanceId = event.getKogitoReferenceId(); + if (processInstanceId != null) { + return asCompletable(trigger, event, findById(processInstanceId)); + } + // check businessKey + String businessKey = event.getKogitoBusinessKey(); + if (businessKey != null) { + return asCompletable(trigger, event, findByBusinessKey(businessKey)); + } + // try to start a new instance if possible + return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); + }); + } - final String kogitoReferenceId = resolveCorrelationId(event); - if (!ConversionUtils.isEmpty(kogitoReferenceId)) { - return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor); - } + private CompletableFuture> asCompletable(String trigger, DataEvent event, Optional> processInstance) { - //if the trigger is for a start event (model converter is set only for start node) - if (modelConverter.isPresent()) { - return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); + return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending signal {} to process instance id '{}'", trigger, pi.id()); + } + signalProcessInstance(trigger, pi.id(), event); + return pi; + }).orElseGet(() -> startNewInstance(trigger, event)), executor); + } + + private Optional> findById(String id) { + LOGGER.debug("Received message with process instance id '{}'", id); + Optional> result = process.instances().findById(id); + if (LOGGER.isDebugEnabled() && result.isEmpty()) { + LOGGER.debug("No instance found for process instance id '{}'", id); } - if (LOGGER.isInfoEnabled()) { - LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); + return result; + + } + + private Optional> findByBusinessKey(String key) { + LOGGER.debug("Received message with business key '{}'", key); + Optional> result = process.instances().findByBusinessKey(key); + if (LOGGER.isDebugEnabled() && result.isEmpty()) { + LOGGER.debug("No instance found for business key '{}'", key); } - return CompletableFuture.completedFuture(null); + return result; } private Optional compositeCorrelation(DataEvent event) { @@ -95,10 +126,10 @@ private Optional compositeCorrelation(DataEvent event) correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty(); } - private String resolveCorrelationId(DataEvent event) { + private Optional resolveCorrelationId(DataEvent event) { return compositeCorrelation(event).flatMap(process.correlations()::find) - .map(CorrelationInstance::getCorrelatedId) - .orElseGet(event::getKogitoReferenceId); + .map(CorrelationInstance::getCorrelatedId); + } private Object resolve(DataEvent event, String key) { @@ -113,22 +144,6 @@ private Object resolve(DataEvent event, String key) { } } - private ProcessInstance handleMessageWithReference(String trigger, DataEvent event, String instanceId) { - LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", - instanceId, - trigger); - return process.instances() - .findById(instanceId) - .map(instance -> { - signalProcessInstance(trigger, instance.id(), event); - return instance; - }) - .orElseGet(() -> { - LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", instanceId, trigger); - return startNewInstance(trigger, event); - }); - } - private Optional signalProcessInstance(String trigger, String id, DataEvent event) { return processService.signalProcessInstance((Process) process, id, dataResolver.apply(event), "Message-" + trigger); } @@ -139,7 +154,12 @@ private ProcessInstance startNewInstance(String trigger, DataEvent event) return processService.createProcessInstance(process, event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)), headersFromEvent(event), event.getKogitoStartFromNode(), trigger, event.getKogitoProcessInstanceId(), compositeCorrelation(event).orElse(null)); - }).orElse(null); + }).orElseGet(() -> { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); + } + return null; + }); } protected Map> headersFromEvent(DataEvent event) { diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java index 92a031c10e9..b620b2de526 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java @@ -22,13 +22,16 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.Collections; +import java.util.Optional; import java.util.UUID; import org.kie.kogito.event.CloudEventMarshaller; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.restassured.http.ContentType; +import io.restassured.specification.RequestSpecification; import static io.restassured.RestAssured.given; import static java.util.concurrent.TimeUnit.SECONDS; @@ -40,8 +43,11 @@ private AssuredTestUtils() { } static String startProcess(String flowName) { - String id = startProcessNoCheck(flowName); + return startProcess(flowName, Optional.empty()); + } + static String startProcess(String flowName, Optional businessKey) { + String id = startProcessNoCheck(flowName, businessKey); given() .contentType(ContentType.JSON) .accept(ContentType.JSON) @@ -52,11 +58,16 @@ static String startProcess(String flowName) { } static String startProcessNoCheck(String flowName) { - return given() + return startProcessNoCheck(flowName, Optional.empty()); + } + + static String startProcessNoCheck(String flowName, Optional businessKey) { + RequestSpecification body = given() .contentType(ContentType.JSON) .when() - .body(Collections.singletonMap("workflowdata", Collections.emptyMap())) - .post("/" + flowName) + .body(Collections.singletonMap("workflowdata", Collections.emptyMap())); + businessKey.ifPresent(key -> body.queryParam("businessKey", key)); + return body.post("/" + flowName) .then() .statusCode(201) .extract().path("id"); @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) { .statusCode(404)); } - static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { - return CloudEventBuilder.v1() + static CloudEvent buildCloudEvent(String id, Optional businessKey, String type, CloudEventMarshaller marshaller) { + io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("")) .withType(type) .withTime(OffsetDateTime.now()) - .withExtension("kogitoprocrefid", id) - .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))) - .build(); + .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))); + businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id)); + return builder.build(); + } + + static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { + return buildCloudEvent(id, Optional.empty(), type, marshaller); } } diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java index 50fcb8e822f..91ede4a21f9 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.awaitility.core.ConditionTimeoutException; import org.junit.jupiter.api.BeforeAll; @@ -57,7 +58,7 @@ static void init() { @Test void testNotStartingEvent() throws IOException { - doIt("nonStartEvent", "move"); + doIt("nonStartEvent", Optional.of("manolo"), "move"); } @Test @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException { } private void sendEvents(String id, String... eventTypes) throws IOException { + sendEvents(id, Optional.empty(), eventTypes); + } + + private void sendEvents(String id, Optional businessKey, String... eventTypes) throws IOException { for (String eventType : eventTypes) { given() .contentType(ContentType.JSON) .when() - .body(generateCloudEvent(id, eventType)) + .body(generateCloudEvent(id, businessKey, eventType)) .post("/" + eventType) .then() .statusCode(202); } } + private void doIt(String flowName, Optional businessKey, String... eventTypes) throws IOException { + String id = startProcess(flowName, businessKey); + sendEvents(id, businessKey, eventTypes); + waitForFinish(flowName, id, Duration.ofSeconds(15)); + } + private void doIt(String flowName, String... eventTypes) throws IOException { String id = startProcess(flowName); sendEvents(id, eventTypes); waitForFinish(flowName, id, Duration.ofSeconds(15)); } - private byte[] generateCloudEvent(String id, String type) throws IOException { + private byte[] generateCloudEvent(String id, Optional businessKey, String type) throws IOException { CloudEventMarshaller marshaller = marshallers.getOrDefault(type, defaultMarshaller); - return marshaller.marshall(buildCloudEvent(id, type, marshaller)); + return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller)); } }