From 73f8eecb02ad462296b840951c990710ad2fad44 Mon Sep 17 00:00:00 2001 From: david steinsland Date: Wed, 18 Dec 2024 11:32:51 +0100 Subject: [PATCH] =?UTF-8?q?eget=20signal=20for=20n=C3=A5r=20alle=20ressurs?= =?UTF-8?q?er=20er=20lukket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rapids_and_rivers_api/RapidsConnection.kt | 21 ++++++++++++++++++- .../tbd_libs/rapids_and_rivers/KafkaRapid.kt | 9 +++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/rapids-and-rivers-api/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers_api/RapidsConnection.kt b/rapids-and-rivers-api/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers_api/RapidsConnection.kt index c4fff77..adaaacd 100644 --- a/rapids-and-rivers-api/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers_api/RapidsConnection.kt +++ b/rapids-and-rivers-api/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers_api/RapidsConnection.kt @@ -26,19 +26,27 @@ abstract class RapidsConnection : MessageContext { listeners.add(listener) } - protected fun notifyMessage(message: String, context: MessageContext, metadata: MessageMetadata, metrics: MeterRegistry) { + protected fun notifyMessage( + message: String, + context: MessageContext, + metadata: MessageMetadata, + metrics: MeterRegistry + ) { listeners.forEach { it.onMessage(message, context, metadata, metrics) } } protected fun notifyStartup() { statusListeners.forEach { it.onStartup(this) } } + protected fun notifyReady() { statusListeners.forEach { it.onReady(this) } } + protected fun notifyNotReady() { statusListeners.forEach { it.onNotReady(this) } } + protected fun notifyShutdownSignal() { statusListeners.forEach { try { @@ -59,6 +67,16 @@ abstract class RapidsConnection : MessageContext { } } + protected fun notifyShutdownComplete() { + statusListeners.forEach { + try { + it.onShutdownComplete(this) + } catch (err: Exception) { + log.error("A shutdown callback threw an exception: ${err.message}", err) + } + } + } + abstract fun start() abstract fun stop() @@ -68,6 +86,7 @@ abstract class RapidsConnection : MessageContext { fun onNotReady(rapidsConnection: RapidsConnection) {} fun onShutdownSignal(rapidsConnection: RapidsConnection) {} fun onShutdown(rapidsConnection: RapidsConnection) {} + fun onShutdownComplete(rapidsConnection: RapidsConnection) {} } fun interface MessageListener { diff --git a/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt b/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt index c17fc60..263ee6c 100644 --- a/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt +++ b/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt @@ -130,7 +130,8 @@ class KafkaRapid( private fun onRecord(record: ConsumerRecord) { withMDC(recordDiganostics(record)) { - val recordValue = record.value() ?: return@withMDC log.info("ignoring record with offset ${record.offset()} in partition ${record.partition()} because value is null (tombstone)") + val recordValue = record.value() + ?: return@withMDC log.info("ignoring record with offset ${record.offset()} in partition ${record.partition()} because value is null (tombstone)") val context = KeyMessageContext(this, record.key()) val metadata = MessageMetadata( topic = record.topic(), @@ -165,6 +166,7 @@ class KafkaRapid( } finally { notifyShutdown() closeResources(lastException) + notifyShutdownComplete() } } @@ -193,6 +195,7 @@ class KafkaRapid( private fun offsetMetadata(offset: Long): OffsetAndMetadata { val clientId = consumer.groupMetadata().groupInstanceId().map { "\"$it\"" }.orElse("null") + @Language("JSON") val metadata = """{"time": "${LocalDateTime.now()}","groupInstanceId": $clientId}""" return OffsetAndMetadata(offset, metadata) @@ -204,9 +207,12 @@ class KafkaRapid( } else { log.info("stopped consuming messages after receiving stop signal") } + log.info("closing consumer") tryAndLog(consumer::close) producerClosed.set(true) + log.info("flushing producer") tryAndLog(producer::flush) + log.info("closing producer") tryAndLog(producer::close) } @@ -228,6 +234,7 @@ class KafkaRapid( is RecordTooLargeException, is UnknownServerException, is AuthorizationException -> true + else -> false } }