From c5cf147442e00a52828409b8e5d4d80909bbb62e Mon Sep 17 00:00:00 2001 From: Adrian Paschkowski Date: Sun, 28 Jan 2024 13:52:22 +0100 Subject: [PATCH] Add more error handling --- .../gg/beemo/latte/broker/BrokerSubclients.kt | 22 +++++++++++++++++-- .../gg/beemo/latte/broker/BrokerClientTest.kt | 2 +- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt index b1bb99c..4fb48c0 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt @@ -24,7 +24,6 @@ data class BrokerClientOptions( val useSafeJsLongs: Boolean = false, ) -// TODO Add error handling, some try-finally to close the producer/consumer even with errors sealed class BaseSubclient( protected val connection: BrokerConnection, protected val client: BrokerClient, @@ -162,7 +161,16 @@ class ConsumerSubclient( value, ) @Suppress("UNCHECKED_CAST") // Safe due to above null validation - callback(message as BaseBrokerMessage) + val brokerMessage = message as BaseBrokerMessage + try { + callback(brokerMessage) + } catch (ex: Exception) { + log.error( + "Uncaught consumer callback error while processing message ${headers.messageId} " + + "with key '$key' in topic '$topic'", + ex, + ) + } } private fun parseIncoming(json: String): T? { @@ -191,6 +199,8 @@ class RpcClient( options, ) { + private val log by Log + private val requestProducer = client.producer(topic, key, options, requestType, requestIsNullable) private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg -> val responseProducer = client.producer( @@ -231,6 +241,13 @@ class RpcClient( } catch (ex: RpcException) { sendResponse(null, ex.status, true, isUpdate = false) return@consumer + } catch (ex: Exception) { + log.error( + "Uncaught RPC callback error while processing message ${msg.headers.messageId} " + + "with key '$key' in topic '$topic'", + ex, + ) + return@consumer } finally { responseProducer.destroy() } @@ -274,6 +291,7 @@ class RpcClient( if (msg.headers.inReplyTo != messageId.get()) { return@consumer } + // Close the flow if we receive an exception if (msg.headers.isException) { close(RpcException(msg.headers.status)) return@consumer diff --git a/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt b/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt index 3d7716b..1bb6a36 100644 --- a/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt +++ b/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt @@ -7,7 +7,7 @@ import org.junit.jupiter.api.assertThrows class BrokerClientTest { - private val connection = LocalConnection() + private val connection = LocalConnection("service", "instance") @Test fun `test greeting RPC`() = withTestClient { client ->