Skip to content

Commit

Permalink
eget signal for når alle ressurser er lukket
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsteinsland committed Dec 18, 2024
1 parent 105481e commit 73f8eec
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@ abstract class RapidsConnection : MessageContext {
listeners.add(listener)
}

protected fun notifyMessage(message: String, context: MessageContext, metadata: MessageMetadata, metrics: MeterRegistry) {
protected fun notifyMessage(
message: String,
context: MessageContext,
metadata: MessageMetadata,
metrics: MeterRegistry
) {
listeners.forEach { it.onMessage(message, context, metadata, metrics) }
}

protected fun notifyStartup() {
statusListeners.forEach { it.onStartup(this) }
}

protected fun notifyReady() {
statusListeners.forEach { it.onReady(this) }
}

protected fun notifyNotReady() {
statusListeners.forEach { it.onNotReady(this) }
}

protected fun notifyShutdownSignal() {
statusListeners.forEach {
try {
Expand All @@ -59,6 +67,16 @@ abstract class RapidsConnection : MessageContext {
}
}

protected fun notifyShutdownComplete() {
statusListeners.forEach {
try {
it.onShutdownComplete(this)
} catch (err: Exception) {
log.error("A shutdown callback threw an exception: ${err.message}", err)
}
}
}

abstract fun start()
abstract fun stop()

Expand All @@ -68,6 +86,7 @@ abstract class RapidsConnection : MessageContext {
fun onNotReady(rapidsConnection: RapidsConnection) {}
fun onShutdownSignal(rapidsConnection: RapidsConnection) {}
fun onShutdown(rapidsConnection: RapidsConnection) {}
fun onShutdownComplete(rapidsConnection: RapidsConnection) {}
}

fun interface MessageListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class KafkaRapid(

private fun onRecord(record: ConsumerRecord<String, String>) {
withMDC(recordDiganostics(record)) {
val recordValue = record.value() ?: return@withMDC log.info("ignoring record with offset ${record.offset()} in partition ${record.partition()} because value is null (tombstone)")
val recordValue = record.value()
?: return@withMDC log.info("ignoring record with offset ${record.offset()} in partition ${record.partition()} because value is null (tombstone)")
val context = KeyMessageContext(this, record.key())
val metadata = MessageMetadata(
topic = record.topic(),
Expand Down Expand Up @@ -165,6 +166,7 @@ class KafkaRapid(
} finally {
notifyShutdown()
closeResources(lastException)
notifyShutdownComplete()
}
}

Expand Down Expand Up @@ -193,6 +195,7 @@ class KafkaRapid(

private fun offsetMetadata(offset: Long): OffsetAndMetadata {
val clientId = consumer.groupMetadata().groupInstanceId().map { "\"$it\"" }.orElse("null")

@Language("JSON")
val metadata = """{"time": "${LocalDateTime.now()}","groupInstanceId": $clientId}"""
return OffsetAndMetadata(offset, metadata)
Expand All @@ -204,9 +207,12 @@ class KafkaRapid(
} else {
log.info("stopped consuming messages after receiving stop signal")
}
log.info("closing consumer")
tryAndLog(consumer::close)
producerClosed.set(true)
log.info("flushing producer")
tryAndLog(producer::flush)
log.info("closing producer")
tryAndLog(producer::close)
}

Expand All @@ -228,6 +234,7 @@ class KafkaRapid(
is RecordTooLargeException,
is UnknownServerException,
is AuthorizationException -> true

else -> false
}
}
Expand Down

0 comments on commit 73f8eec

Please sign in to comment.