Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking join on SolaceErrorTopic failure handler #10

Closed
ozangunalp opened this issue Dec 20, 2023 · 6 comments · Fixed by #14
Closed

Blocking join on SolaceErrorTopic failure handler #10

ozangunalp opened this issue Dec 20, 2023 · 6 comments · Fixed by #14
Assignees

Comments

@ozangunalp
Copy link
Collaborator

https://github.com/SolaceCoEExt/solace-quarkus/blob/76d9f4fe7052f2c4e5eff06ed9077de93036ca6f/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java#L52-L70

Subscribing to the completionStage and calling join will effectively block the caller thread, which most of the time is the event-loop thread. The send operation should chain with the publishReceipt receive and return the handle method on the messageContext as it is already done.

@SravanThotakura05 SravanThotakura05 self-assigned this Dec 20, 2023
@ozangunalp
Copy link
Collaborator Author

@SravanThotakura05 This needs more testing but a simple rewrite of this code without joining would be like the following :

    @Override
    public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
        return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive)
                .onFailure().retry().withBackOff(Duration.ofSeconds(1))
                .atMost(maxDeliveryAttempts)
                .onItem().invoke(() -> ackSupport.settle(msg.getMessage(), Outcome.ACCEPTED))
                .replaceWithVoid()
                .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t))
                .emitOn(msg::runOnMessageContext)
                .subscribeAsCompletionStage();
    }

SravanThotakura05 added a commit that referenced this issue Jan 1, 2024
@SravanThotakura05
Copy link
Collaborator

onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t))

@ozangunalp - failure invoke doesn't get triggered after all attempts are exhausted. Apart from that this code handles publish/failure to error topic as expected.

@SravanThotakura05 SravanThotakura05 mentioned this issue Jan 1, 2024
@ozangunalp
Copy link
Collaborator Author

Normally, the last failure is propagated downstream with the retry operator, which should trigger the logging.
Can we write a test case for that?

@SravanThotakura05
Copy link
Collaborator

SravanThotakura05 commented Jan 4, 2024

@ozangunalp Yes, but I have made these changes for the test to assert on logs

Added following in pom.xml

       <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-junit5-internal</artifactId>
            <scope>test</scope>
        </dependency>

Modified following in pom.xml
<java.util.logging.manager>java.util.logging.LogManager</java.util.logging.manager> to <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>

Null pointer exception is thrown for rootLogger in below code if logging manager is set to java.util.logging.LogManager

In the test case I have added the following to assert on logs

private static final java.util.logging.Logger rootLogger = LogManager.getLogManager().getLogger("io.quarkiverse.solace");
    public static final InMemoryLogHandler inMemoryLogHandler = new InMemoryLogHandler(
            record -> record.getLevel().intValue() >= Level.SEVERE.intValue());

static {
        rootLogger.addHandler(inMemoryLogHandler);
}
assertThat(inMemoryLogHandler.getRecords().stream().anyMatch(record -> record.getMessage().contains("Publishing error message to topic"))).isEqualTo(true)

@SravanThotakura05 SravanThotakura05 linked a pull request Jan 4, 2024 that will close this issue
@ozangunalp
Copy link
Collaborator Author

ozangunalp commented Jan 4, 2024

I can see that the error is logged by copying the test consumerFailedProcessingPublishToErrorTopic with a non-existent error topic and max retries to 2.
You can verify that no failed messages were received in the error-in channel.
I don't think that using the JBoss log manager would work in these tests. If you'd like to check the error is logged, you can get the SolaceLogging logger backend from Log4j directly.

Logger logger = org.apache.log4j.Logger.getLogger("io.quarkiverse.solace");
logger.addAppender(new AppenderSkeleton() {
...
});

@SravanThotakura05
Copy link
Collaborator

Okay, I assumed we need to use JBoss log manager as our logging is based it which worked. I have updated as per your comment. Changes are in same PR - https://github.com/SolaceCoEExt/solace-quarkus/pull/14/files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants