Skip to content

Commit

Permalink
[Fix_#3451] Fixing start event state filtering (apache#3542)
Browse files Browse the repository at this point in the history
* [Fix_#3451] Fixing start event state filtering

* [Fix_#3451] Add IT test
  • Loading branch information
fjtirado authored and rgdoliveira committed Jun 10, 2024
1 parent 58c934a commit ff3b8b6
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.jbpm.workflow.core.node.Join;
import org.jbpm.workflow.core.node.Split;
import org.kie.kogito.serverless.workflow.parser.ParserContext;
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;

import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.events.OnEvents;
Expand Down Expand Up @@ -65,7 +64,7 @@ public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), isStartState ? ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR : getVarName(),
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), getVarName(),
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.kie.kogito.serverless.workflow.SWFConstants;
import org.kie.kogito.serverless.workflow.parser.ParserContext;
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;
import org.kie.kogito.serverless.workflow.parser.VariableInfo;
import org.kie.kogito.serverless.workflow.suppliers.CollectorActionSupplier;
import org.kie.kogito.serverless.workflow.suppliers.CompensationActionSupplier;
import org.kie.kogito.serverless.workflow.suppliers.ErrorExpressionActionSupplier;
Expand Down Expand Up @@ -424,7 +425,8 @@ protected final MakeNodeResult filterAndMergeNode(RuleFlowNodeContainerFactory<?
toExpr = eventFilter.getToStateData();
useData = eventFilter.isUseData();
}
return filterAndMergeNode(embeddedSubProcess, varName, null, dataExpr, toExpr, useData, true, nodeSupplier);
return filterAndMergeNode(embeddedSubProcess, isStartState ? new VariableInfo(DEFAULT_WORKFLOW_VAR, varName) : new VariableInfo(varName, varName), null, dataExpr, toExpr, useData, true,
nodeSupplier);
}

protected boolean isTempVariable(String varName) {
Expand All @@ -434,7 +436,13 @@ protected boolean isTempVariable(String varName) {
protected final MakeNodeResult filterAndMergeNode(RuleFlowNodeContainerFactory<?, ?> embeddedSubProcess, String actionVarName, String fromStateExpr, String resultExpr, String toStateExpr,
boolean useData,
boolean shouldMerge, FilterableNodeSupplier nodeSupplier) {
return filterAndMergeNode(embeddedSubProcess, new VariableInfo(actionVarName, actionVarName), fromStateExpr, resultExpr, toStateExpr, useData, shouldMerge, nodeSupplier);
}

protected final MakeNodeResult filterAndMergeNode(RuleFlowNodeContainerFactory<?, ?> embeddedSubProcess, VariableInfo variableInfo, String fromStateExpr, String resultExpr, String toStateExpr,
boolean useData,
boolean shouldMerge, FilterableNodeSupplier nodeSupplier) {
String actionVarName = variableInfo.getOutputVar();
if (isTempVariable(actionVarName)) {
embeddedSubProcess.variable(actionVarName, new ObjectDataType(JsonNode.class.getCanonicalName()), Map.of(KogitoTags.VARIABLE_TAGS, KogitoTags.INTERNAL_TAG));
}
Expand All @@ -451,7 +459,7 @@ protected final MakeNodeResult filterAndMergeNode(RuleFlowNodeContainerFactory<?

if (useData && resultExpr != null) {
currentNode = connect(currentNode, embeddedSubProcess.actionNode(parserContext.newId()).action(ExpressionActionSupplier.of(workflow, resultExpr)
.withVarNames(actionVarName, actionVarName).build()));
.withVarNames(variableInfo.getInputVar(), actionVarName).build()));
}

if (useData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ mp.messaging.incoming.never.path=/never
mp.messaging.incoming.eventTimeout1.connector=quarkus-http
mp.messaging.incoming.eventTimeout1.path=/eventTimeout1

mp.messaging.incoming.eventTimeout1.connector=quarkus-http
mp.messaging.incoming.eventTimeout1.path=/eventTimeout1

mp.messaging.incoming.customer-arrival-type.connector=quarkus-http
mp.messaging.incoming.customer-arrival-type.path=/eventWithToStateFilter

mp.messaging.incoming.eventTimeout2.connector=quarkus-http
mp.messaging.incoming.eventTimeout2.path=/eventTimeout2

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"id": "GreetCustomer",
"name": "Greet Customers when they arrive",
"version": "1.0",
"specVersion": "0.8",
"start": "WaitForCustomerToArrive",
"states":[
{
"name": "WaitForCustomerToArrive",
"type": "event",
"onEvents": [{
"eventRefs": ["CustomerArrivesEvent"],
"eventDataFilter": {
"data": "${ .customer }",
"toStateData": "${ .customerInfo }"
},
"actions":[
{
"functionRef": {
"refName": "greetingFunction",
"arguments": {
"message": "${ .customerInfo.name } "
}
}
}
]
}],
"end": true
}
],
"events": [{
"name": "CustomerArrivesEvent",
"type": "customer-arrival-type",
"source": "customer-arrival-event-source"
}],
"functions": [{
"name": "greetingFunction",
"type": "custom",
"operation": "sysout"
}]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.kie.kogito.quarkus.workflows;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -32,6 +36,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.restassured.RestAssured;
Expand All @@ -56,6 +61,23 @@ static void init() {
defaultMarshaller = new ByteArrayCloudEventMarshaller(mapper);
}

@Test
void testStartingEventWithToStateFilter() {
given()
.contentType(ContentType.JSON)
.when()
.body(CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("customer-arrival-event-source"))
.withType("customer-arrival-type")
.withTime(OffsetDateTime.now())
.withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build())
.post("/eventWithToStateFilter")
.then()
.statusCode(202);

}

@Test
void testNotStartingEvent() throws IOException {
doIt("nonStartEvent", Optional.of("manolo"), "move");
Expand Down

0 comments on commit ff3b8b6

Please sign in to comment.