diff --git a/kafka-streams/main/libs/kafka/StreamsException.kt b/kafka-streams/main/libs/kafka/StreamsException.kt index 3f90adc..7205f42 100644 --- a/kafka-streams/main/libs/kafka/StreamsException.kt +++ b/kafka-streams/main/libs/kafka/StreamsException.kt @@ -4,12 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.streams.errors.DeserializationExceptionHandler import org.apache.kafka.streams.errors.ProductionExceptionHandler -import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT import org.apache.kafka.streams.processor.ProcessorContext import org.slf4j.LoggerFactory +import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse as ConsumerHandler +import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse as ProducerHandler +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse as StreamHandler private val secureLog = LoggerFactory.getLogger("secureLog") @@ -22,17 +22,26 @@ class ReplaceThread(message: Any) : RuntimeException(message.toString()) * Exceptions during deserialization, networks issues etc. */ class EntryPointExceptionHandler : DeserializationExceptionHandler { - override fun handle(context: ProcessorContext, record: ConsumerRecord, exception: Exception) = - DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE.also { - secureLog.warn( - "Exception reading from kafka: taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), record.topic(), record.partition(), record.offset(), exception - ) - } + override fun handle( + context: ProcessorContext, + record: ConsumerRecord, + exception: Exception, + ): DeserializationExceptionHandler.DeserializationHandlerResponse { + secureLog.warn( + """ + Exception deserializing record + Topic: ${record.topic()} + Partition: ${record.partition()} + Offset: ${record.offset()} + TaskId: ${context.taskId()} + """.trimIndent(), + exception + ) - override fun configure(configs: MutableMap) { - // nothing + return ConsumerHandler.FAIL } + + override fun configure(configs: MutableMap) {} } /** @@ -45,23 +54,28 @@ class EntryPointExceptionHandler : DeserializationExceptionHandler { * 3. shutdown all streams instances (with the same application-id */ class ProcessingExceptionHandler : StreamsUncaughtExceptionHandler { - override fun handle(exception: Throwable): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse { + override fun handle( + exception: Throwable, + ): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse { return when (exception.cause) { is ReplaceThread -> logAndReplaceThread(exception) - null -> logAndShutdownClient(exception) else -> logAndShutdownClient(exception) } } - private fun logAndReplaceThread(err: Throwable) = - REPLACE_THREAD.also { - secureLog.error("Uventet feil, logger og leser neste record, ${err.message}") - } + private fun logAndReplaceThread( + err: Throwable, + ): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse { + secureLog.error("Feil ved prosessering av record, logger og leser neste record", err) + return StreamHandler.REPLACE_THREAD + } - private fun logAndShutdownClient(err: Throwable) = - SHUTDOWN_CLIENT.also { - secureLog.error("Uventet feil, logger og avslutter client, ${err.message}") - } + private fun logAndShutdownClient( + err: Throwable, + ): StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse { + secureLog.error("Uventet feil, logger og avslutter client", err) + return StreamHandler.SHUTDOWN_CLIENT + } } /** @@ -70,12 +84,13 @@ class ProcessingExceptionHandler : StreamsUncaughtExceptionHandler { * Exceptions due to serialization, networking etc. */ class ExitPointExceptionHandler : ProductionExceptionHandler { - override fun handle(record: ProducerRecord, exception: Exception) = - CONTINUE.also { - secureLog.error("Feil i streams, logger og leser neste record", exception) - } - - override fun configure(configs: MutableMap) { - // nothing + override fun handle( + record: ProducerRecord, + exception: Exception, + ): ProductionExceptionHandler.ProductionExceptionHandlerResponse { + secureLog.error("Feil i streams, logger og leser neste record", exception) + return ProducerHandler.FAIL } + + override fun configure(configs: MutableMap) {} }