diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java index 18368bcd545..116eef84229 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java @@ -31,18 +31,36 @@ public class EventsRuntimeConfig { @ConfigItem(name = "processinstances.enabled", defaultValue = "true") boolean processInstancesEventsEnabled; + /** + * Propagate errors for process instance emitter + */ + @ConfigItem(name = "processinstances.errors.propagate", defaultValue = "false") + boolean processInstancesPropagate; + /** * Enable publishing processes definition events */ @ConfigItem(name = "processdefinitions.enabled", defaultValue = "true") boolean processDefinitionEventsEnabled; + /** + * Propagate errors for process definition emitter + */ + @ConfigItem(name = "processdefinition.errors.propagate", defaultValue = "false") + boolean processDefinitionPropagate; + /** * Enable publishing user task instances events */ @ConfigItem(name = "usertasks.enabled", defaultValue = "true") boolean userTasksEventsEnabled; + /** + * Propagate errors for user task emitter + */ + @ConfigItem(name = "usertasks.errors.propagate", defaultValue = "false") + boolean userTasksPropagate; + public boolean isProcessInstancesEventsEnabled() { return processInstancesEventsEnabled; } @@ -55,4 +73,16 @@ public boolean isUserTasksEventsEnabled() { return userTasksEventsEnabled; } + public boolean isProcessInstancesPropagateError() { + return processInstancesPropagate; + } + + public boolean isProcessDefinitionPropagateError() { + return processDefinitionPropagate; + } + + public boolean isUserTasksPropagateError() { + return userTasksPropagate; + } + } diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java index 1b2b56ada1e..31ff4acd08a 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Message; @@ -52,14 +53,17 @@ public class ReactiveMessagingEventPublisher implements EventPublisher { @Inject @Channel(PROCESS_INSTANCES_TOPIC_NAME) MutinyEmitter processInstancesEventsEmitter; + private BiConsumer, Message> processInstanceConsumer; @Inject @Channel(PROCESS_DEFINITIONS_TOPIC_NAME) MutinyEmitter processDefinitionEventsEmitter; + private BiConsumer, Message> processDefinitionConsumer; @Inject @Channel(USER_TASK_INSTANCES_TOPIC_NAME) MutinyEmitter userTasksEventsEmitter; + private BiConsumer, Message> userTaskConsumer; @Inject EventsRuntimeConfig eventsRuntimeConfig; @@ -71,6 +75,9 @@ public class ReactiveMessagingEventPublisher implements EventPublisher { @PostConstruct public void init() { decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null; + processInstanceConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); + processDefinitionConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); + userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); } @Override @@ -79,7 +86,7 @@ public void publish(DataEvent event) { switch (event.getType()) { case "ProcessDefinitionEvent": if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) { - publishToTopic(event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); + publishToTopic(processDefinitionConsumer, event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); } break; case "ProcessInstanceErrorDataEvent": @@ -88,7 +95,7 @@ public void publish(DataEvent event) { case "ProcessInstanceStateDataEvent": case "ProcessInstanceVariableDataEvent": if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) { - publishToTopic(event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); + publishToTopic(processInstanceConsumer, event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); } break; @@ -99,7 +106,7 @@ public void publish(DataEvent event) { case "UserTaskInstanceStateDataEvent": case "UserTaskInstanceVariableDataEvent": if (eventsRuntimeConfig.isUserTasksEventsEnabled()) { - publishToTopic(event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); + publishToTopic(userTaskConsumer, event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); } break; default: @@ -114,31 +121,48 @@ public void publish(Collection> events) { } } - protected void publishToTopic(DataEvent event, MutinyEmitter emitter, String topic) { + protected void publishToTopic(BiConsumer, Message> consumer, DataEvent event, MutinyEmitter emitter, String topic) { logger.debug("About to publish event {} to topic {}", event, topic); + Message message = null; try { String eventString = json.writeValueAsString(event); - Message message = decorateMessage(ContextAwareMessage.of(eventString)); - logger.debug("Event payload '{}'", eventString); - emitter.sendMessageAndAwait(message); - + message = decorateMessage(ContextAwareMessage.of(eventString)); } catch (Exception e) { - logger.error("Error while creating event to topic {} for event {}", topic, event, e); + logger.error("Error while creating event to topic {} for event {}", topic, event); + } + if (message != null) { + consumer.accept(emitter, message); } } - protected CompletionStage onAck(DataEvent event, String topic) { - logger.debug("Successfully published event {} to topic {}", event, topic); + protected CompletionStage onAck(Message message) { + logger.debug("Successfully published message {}", message); return CompletableFuture.completedFuture(null); } - protected CompletionStage onNack(Throwable reason, DataEvent event, String topic) { - logger.error("Error while publishing event to topic {} for event {}", topic, event, reason); + protected CompletionStage onNack(Throwable reason, Message message) { + logger.error("Error while publishing message {}", message, reason); return CompletableFuture.completedFuture(null); } protected Message decorateMessage(Message message) { return decoratorProvider != null ? decoratorProvider.decorate(message) : message; } + + private class BlockingMessageEmitter implements BiConsumer, Message> { + @Override + public void accept(MutinyEmitter emitter, Message message) { + emitter.sendMessageAndAwait(message); + } + } + + private class ReactiveMessageEmitter implements BiConsumer, Message> { + @Override + public void accept(MutinyEmitter emitter, Message message) { + emitter.sendMessageAndForget(message + .withAck(() -> onAck(message)) + .withNack(reason -> onNack(reason, message))); + } + } }