Skip to content

Commit

Permalink
fail exception handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
rtc11 committed Mar 21, 2024
1 parent 0cc188a commit 3eb672f
Showing 1 changed file with 44 additions and 29 deletions.
73 changes: 44 additions & 29 deletions kafka-streams/main/libs/kafka/StreamsException.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.streams.errors.DeserializationExceptionHandler
import org.apache.kafka.streams.errors.ProductionExceptionHandler
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
import org.apache.kafka.streams.processor.ProcessorContext
import org.slf4j.LoggerFactory
import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse as ConsumerHandler
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse as ProducerHandler
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse as StreamHandler

private val secureLog = LoggerFactory.getLogger("secureLog")

Expand All @@ -22,17 +22,26 @@ class ReplaceThread(message: Any) : RuntimeException(message.toString())
* Exceptions during deserialization, networks issues etc.
*/
class EntryPointExceptionHandler : DeserializationExceptionHandler {
override fun handle(context: ProcessorContext, record: ConsumerRecord<ByteArray, ByteArray>, exception: Exception) =
DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE.also {
secureLog.warn(
"Exception reading from kafka: taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(), exception
)
}
override fun handle(
context: ProcessorContext,
record: ConsumerRecord<ByteArray, ByteArray>,
exception: Exception,
): DeserializationExceptionHandler.DeserializationHandlerResponse {
secureLog.warn(
"""
Exception deserializing record
Topic: ${record.topic()}
Partition: ${record.partition()}
Offset: ${record.offset()}
TaskId: ${context.taskId()}
""".trimIndent(),
exception
)

override fun configure(configs: MutableMap<String, *>) {
// nothing
return ConsumerHandler.FAIL
}

override fun configure(configs: MutableMap<String, *>) {}
}

/**
Expand All @@ -45,23 +54,28 @@ class EntryPointExceptionHandler : DeserializationExceptionHandler {
* 3. shutdown all streams instances (with the same application-id
*/
class ProcessingExceptionHandler : StreamsUncaughtExceptionHandler {
override fun handle(exception: Throwable): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse {
override fun handle(
exception: Throwable,
): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse {
return when (exception.cause) {
is ReplaceThread -> logAndReplaceThread(exception)
null -> logAndShutdownClient(exception)
else -> logAndShutdownClient(exception)
}
}

private fun logAndReplaceThread(err: Throwable) =
REPLACE_THREAD.also {
secureLog.error("Uventet feil, logger og leser neste record, ${err.message}")
}
private fun logAndReplaceThread(
err: Throwable,
): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse {
secureLog.error("Feil ved prosessering av record, logger og leser neste record", err)
return StreamHandler.REPLACE_THREAD
}

private fun logAndShutdownClient(err: Throwable) =
SHUTDOWN_CLIENT.also {
secureLog.error("Uventet feil, logger og avslutter client, ${err.message}")
}
private fun logAndShutdownClient(
err: Throwable,
): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse {
secureLog.error("Uventet feil, logger og avslutter client", err)
return StreamHandler.SHUTDOWN_CLIENT
}
}

/**
Expand All @@ -70,12 +84,13 @@ class ProcessingExceptionHandler : StreamsUncaughtExceptionHandler {
* Exceptions due to serialization, networking etc.
*/
class ExitPointExceptionHandler : ProductionExceptionHandler {
override fun handle(record: ProducerRecord<ByteArray, ByteArray>, exception: Exception) =
CONTINUE.also {
secureLog.error("Feil i streams, logger og leser neste record", exception)
}

override fun configure(configs: MutableMap<String, *>) {
// nothing
override fun handle(
record: ProducerRecord<ByteArray, ByteArray>,
exception: Exception,
): ProductionExceptionHandler.ProductionExceptionHandlerResponse {
secureLog.error("Feil i streams, logger og leser neste record", exception)
return ProducerHandler.FAIL
}

override fun configure(configs: MutableMap<String, *>) {}
}

0 comments on commit 3eb672f

Please sign in to comment.