Skip to content

Commit

Permalink
[KOGITO-9276] Adding support for businnesKey to resume a process
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Feb 22, 2024
1 parent 40f080f commit eede743
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
Expand Down Expand Up @@ -74,31 +74,31 @@ public CompletableFuture<ProcessInstance<M>> dispatch(String trigger, DataEvent<
}
return CompletableFuture.completedFuture(null);
}

final String kogitoReferenceId = resolveCorrelationId(event);
if (!ConversionUtils.isEmpty(kogitoReferenceId)) {
return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor);
}

//if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return CompletableFuture.completedFuture(null);
return resolveCorrelationId(event)
.map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor))
.orElseGet(() -> {
// if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return CompletableFuture.completedFuture(null);
}
});
}

private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> event) {
return correlationKeys != null && !correlationKeys.isEmpty() ? Optional.of(new CompositeCorrelation(
correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
}

private String resolveCorrelationId(DataEvent<?> event) {
private Optional<String> resolveCorrelationId(DataEvent<?> event) {
return compositeCorrelation(event).flatMap(process.correlations()::find)
.map(CorrelationInstance::getCorrelatedId)
.orElseGet(event::getKogitoReferenceId);
.or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY)))
.or(() -> Optional.ofNullable(event.getKogitoReferenceId()));
}

private Object resolve(DataEvent<?> event, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;

import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.restassured.http.ContentType;
import io.restassured.specification.RequestSpecification;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -40,8 +43,11 @@ private AssuredTestUtils() {
}

static String startProcess(String flowName) {
String id = startProcessNoCheck(flowName);
return startProcess(flowName, Optional.empty());
}

static String startProcess(String flowName, Optional<String> businessKey) {
String id = startProcessNoCheck(flowName, businessKey);
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
Expand All @@ -52,11 +58,16 @@ static String startProcess(String flowName) {
}

static String startProcessNoCheck(String flowName) {
return given()
return startProcessNoCheck(flowName, Optional.empty());
}

static String startProcessNoCheck(String flowName, Optional<String> businessKey) {
RequestSpecification body = given()
.contentType(ContentType.JSON)
.when()
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()))
.post("/" + flowName)
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()));
businessKey.ifPresent(key -> body.queryParam("businessKey", key));
return body.post("/" + flowName)
.then()
.statusCode(201)
.extract().path("id");
Expand All @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) {
.statusCode(404));
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return CloudEventBuilder.v1()
static CloudEvent buildCloudEvent(String id, Optional<String> businessKey, String type, CloudEventMarshaller<byte[]> marshaller) {
io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(""))
.withType(type)
.withTime(OffsetDateTime.now())
.withExtension("kogitoprocrefid", id)
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")))
.build();
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")));
businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id));
return builder.build();
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return buildCloudEvent(id, Optional.empty(), type, marshaller);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -57,7 +58,7 @@ static void init() {

@Test
void testNotStartingEvent() throws IOException {
doIt("nonStartEvent", "move");
doIt("nonStartEvent", Optional.of("manolo"), "move");
}

@Test
Expand Down Expand Up @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException {
}

private void sendEvents(String id, String... eventTypes) throws IOException {
sendEvents(id, Optional.empty(), eventTypes);
}

private void sendEvents(String id, Optional<String> businessKey, String... eventTypes) throws IOException {
for (String eventType : eventTypes) {
given()
.contentType(ContentType.JSON)
.when()
.body(generateCloudEvent(id, eventType))
.body(generateCloudEvent(id, businessKey, eventType))
.post("/" + eventType)
.then()
.statusCode(202);
}
}

private void doIt(String flowName, Optional<String> businessKey, String... eventTypes) throws IOException {
String id = startProcess(flowName, businessKey);
sendEvents(id, businessKey, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private void doIt(String flowName, String... eventTypes) throws IOException {
String id = startProcess(flowName);
sendEvents(id, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private byte[] generateCloudEvent(String id, String type) throws IOException {
private byte[] generateCloudEvent(String id, Optional<String> businessKey, String type) throws IOException {
CloudEventMarshaller<byte[]> marshaller = marshallers.getOrDefault(type, defaultMarshaller);
return marshaller.marshall(buildCloudEvent(id, type, marshaller));
return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller));
}
}

0 comments on commit eede743

Please sign in to comment.