Skip to content

Commit

Permalink
[Fix_#3366] Making exception propagation for event publishing
Browse files Browse the repository at this point in the history
configurable
  • Loading branch information
fjtirado committed Jan 29, 2024
1 parent 8424c00 commit 2c27828
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,36 @@ public class EventsRuntimeConfig {
@ConfigItem(name = "processinstances.enabled", defaultValue = "true")
boolean processInstancesEventsEnabled;

/**
* Propagate errors for process instance emitter
*/
@ConfigItem(name = "processinstances.errors.propagate", defaultValue = "false")
boolean processInstancesPropagate;

/**
* Enable publishing processes definition events
*/
@ConfigItem(name = "processdefinitions.enabled", defaultValue = "true")
boolean processDefinitionEventsEnabled;

/**
* Propagate errors for process definition emitter
*/
@ConfigItem(name = "processdefinition.errors.propagate", defaultValue = "false")
boolean processDefinitionPropagate;

/**
* Enable publishing user task instances events
*/
@ConfigItem(name = "usertasks.enabled", defaultValue = "true")
boolean userTasksEventsEnabled;

/**
* Propagate errors for user task emitter
*/
@ConfigItem(name = "usertasks.errors.propagate", defaultValue = "false")
boolean userTasksPropagate;

public boolean isProcessInstancesEventsEnabled() {
return processInstancesEventsEnabled;
}
Expand All @@ -55,4 +73,16 @@ public boolean isUserTasksEventsEnabled() {
return userTasksEventsEnabled;
}

public boolean isProcessInstancesPropagateError() {
return processInstancesPropagate;
}

public boolean isProcessDefinitionPropagateError() {
return processDefinitionPropagate;
}

public boolean isUserTasksPropagateError() {
return userTasksPropagate;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
Expand Down Expand Up @@ -52,14 +53,17 @@ public class ReactiveMessagingEventPublisher implements EventPublisher {
@Inject
@Channel(PROCESS_INSTANCES_TOPIC_NAME)
MutinyEmitter<String> processInstancesEventsEmitter;
private BiConsumer<MutinyEmitter<String>, Message<String>> processInstanceConsumer;

@Inject
@Channel(PROCESS_DEFINITIONS_TOPIC_NAME)
MutinyEmitter<String> processDefinitionEventsEmitter;
private BiConsumer<MutinyEmitter<String>, Message<String>> processDefinitionConsumer;

@Inject
@Channel(USER_TASK_INSTANCES_TOPIC_NAME)
MutinyEmitter<String> userTasksEventsEmitter;
private BiConsumer<MutinyEmitter<String>, Message<String>> userTaskConsumer;
@Inject
EventsRuntimeConfig eventsRuntimeConfig;

Expand All @@ -71,6 +75,9 @@ public class ReactiveMessagingEventPublisher implements EventPublisher {
@PostConstruct
public void init() {
decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null;
processInstanceConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
processDefinitionConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
}

@Override
Expand All @@ -79,7 +86,7 @@ public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessDefinitionEvent":
if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) {
publishToTopic(event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME);
publishToTopic(processDefinitionConsumer, event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME);
}
break;
case "ProcessInstanceErrorDataEvent":
Expand All @@ -88,7 +95,7 @@ public void publish(DataEvent<?> event) {
case "ProcessInstanceStateDataEvent":
case "ProcessInstanceVariableDataEvent":
if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) {
publishToTopic(event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME);
publishToTopic(processInstanceConsumer, event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME);
}
break;

Expand All @@ -99,7 +106,7 @@ public void publish(DataEvent<?> event) {
case "UserTaskInstanceStateDataEvent":
case "UserTaskInstanceVariableDataEvent":
if (eventsRuntimeConfig.isUserTasksEventsEnabled()) {
publishToTopic(event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME);
publishToTopic(userTaskConsumer, event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME);
}
break;
default:
Expand All @@ -114,31 +121,48 @@ public void publish(Collection<DataEvent<?>> events) {
}
}

protected void publishToTopic(DataEvent<?> event, MutinyEmitter<String> emitter, String topic) {
protected void publishToTopic(BiConsumer<MutinyEmitter<String>, Message<String>> consumer, DataEvent<?> event, MutinyEmitter<String> emitter, String topic) {
logger.debug("About to publish event {} to topic {}", event, topic);
Message<String> message = null;
try {
String eventString = json.writeValueAsString(event);
Message<String> message = decorateMessage(ContextAwareMessage.of(eventString));

logger.debug("Event payload '{}'", eventString);
emitter.sendMessageAndAwait(message);

message = decorateMessage(ContextAwareMessage.of(eventString));
} catch (Exception e) {
logger.error("Error while creating event to topic {} for event {}", topic, event, e);
logger.error("Error while creating event to topic {} for event {}", topic, event);
}
if (message != null) {
consumer.accept(emitter, message);
}
}

protected CompletionStage<Void> onAck(DataEvent<?> event, String topic) {
logger.debug("Successfully published event {} to topic {}", event, topic);
protected CompletionStage<Void> onAck(Message<String> message) {
logger.debug("Successfully published message {}", message);
return CompletableFuture.completedFuture(null);
}

protected CompletionStage<Void> onNack(Throwable reason, DataEvent<?> event, String topic) {
logger.error("Error while publishing event to topic {} for event {}", topic, event, reason);
protected CompletionStage<Void> onNack(Throwable reason, Message<String> message) {
logger.error("Error while publishing message {}", message, reason);
return CompletableFuture.completedFuture(null);
}

protected Message<String> decorateMessage(Message<String> message) {
return decoratorProvider != null ? decoratorProvider.decorate(message) : message;
}

private class BlockingMessageEmitter implements BiConsumer<MutinyEmitter<String>, Message<String>> {
@Override
public void accept(MutinyEmitter<String> emitter, Message<String> message) {
emitter.sendMessageAndAwait(message);
}
}

private class ReactiveMessageEmitter implements BiConsumer<MutinyEmitter<String>, Message<String>> {
@Override
public void accept(MutinyEmitter<String> emitter, Message<String> message) {
emitter.sendMessageAndForget(message
.withAck(() -> onAck(message))
.withNack(reason -> onNack(reason, message)));
}
}
}

0 comments on commit 2c27828

Please sign in to comment.