Skip to content

Commit

Permalink
fikser potensiell deadlock-issue
Browse files Browse the repository at this point in the history
implementerer en "alt eller ingenting"-algoritme siden det kan være begrenset
mengde topics tilgjengelig og at det kan være flere tråder som konkurrerer om samme ressurs (potensiell deadlock).
Eksempel:
  Bassenget består av to topics, og to tester kjører i parallell.
  Begge testene vil ha to topics.
  Begge får én topic hver, og så står begge to og venter på siste — men det er jo ingen igjen!
Derfor gjør vi et forsøk på å få 2 stk med en gang, eller så gir vi tilbake det vi fikk. Før eller
siden vil en av trådene kunne få to topics hver.
  • Loading branch information
davidsteinsland committed Nov 10, 2024
1 parent 1d9fefa commit 7abed1a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.navikt.tbd_libs.test_support

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
Expand Down Expand Up @@ -56,26 +57,41 @@ class KafkaContainer(
return nyeTopics(1, timeout).single()
}

suspend fun nyeTopics(antall: Int, timeout: Duration = Duration.ofSeconds(20)): List<TestTopic> {
suspend fun nyeTopics(antall: Int, timeout: Duration = Duration.ofSeconds(40)): List<TestTopic> {
check(antall <= poolSize) { "det er satt opp $poolSize topics, men $antall ønskes. det går ikke!" }
opprettTopicsHvisIkkeInitialisert()
returnTopicsOrThrow(antall, timeout)
}
private suspend fun fåTopicsOrThrow(antall: Int, timeout: Duration): List<TestTopic> {
val claimedTopics =Topics(antall, timeout)
if (claimedTopics.size != antall) {
droppTopics(claimedTopics)
throw RuntimeException("Fikk kun tak i ${claimedTopics.size} av $antall")
return withTimeoutOrNull(timeout.toKotlinDuration()) {
var claimedTopics: List<TestTopic> = emptyList()
// implementerer en "alt eller ingenting"-algoritme siden det kan være begrenset
// mengde topics tilgjengelig og at det kan være flere tråder som konkurrerer om samme ressurs (potensiell deadlock).
// Eksempel:
// Bassenget består av to topics, og to tester kjører i parallell.
// Begge testene vil ha to topics.
// Begge får én topic hver, og så står begge to og venter på siste — men det er jo ingen igjen!
// Derfor gjør vi et forsøk på å få 2 stk med en gang, eller så gir vi tilbake det vi fikk. Før eller
// siden vil en av trådene kunne få to topics hver.
while (isActive && claimedTopics.size != antall) {
droppTopics(claimedTopics)
claimedTopics =Topics(antall)
}
claimedTopics
}
println("> Får topic ${claimedTopics.joinToString { it.topicnavn} }")
return claimedTopics
?.also {
println("> Får topic ${it.joinToString { it.topicnavn} }")
}
?: throw RuntimeException("Fikk ikke tak i $antall topics innen ${timeout.toMillis()} millisekunder")
}

private suspend fun fåTopics(antall: Int, timeout: Duration): List<TestTopic> {
private fun fåTopics(antall: Int): List<TestTopic> {
// forsøker å få <antall> topics, ellers returneres det som var tilgjengelig
return buildList {
(1..antall).forEach {
withTimeoutOrNull(timeout.toKotlinDuration()) {
add(tilgjengeligeTopics.receive())
} ?: return@buildList
tilgjengeligeTopics.tryReceive().getOrNull()?.also {
add(it)
} ?: return@buildList
}
}
}
Expand All @@ -94,9 +110,13 @@ class KafkaContainer(
}

suspend fun droppTopics(testTopics: List<TestTopic>) {
if (testTopics.isEmpty()) return
println("Tilgjengeliggjør ${testTopics.joinToString { it.topicnavn }} igjen")
testTopics.forEach { it.cleanUp() }
withTimeout(20.seconds) { testTopics.forEach { tilgjengeligeTopics.send(it) } }
try {
testTopics.forEach { it.cleanUp() }
} finally {
withTimeout(20.seconds) { testTopics.forEach { tilgjengeligeTopics.send(it) } }
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import java.util.concurrent.ConcurrentHashMap
object KafkaContainers {
private val JUNIT_PARALLELISM = System.getProperty("junit.jupiter.execution.parallel.config.fixed.parallelism")?.toInt() ?: 1

private const val MIN_POOL_SIZE = 1
private const val AT_LEAST_ONE = 1
private val MIN_POOL_SIZE = maxOf(AT_LEAST_ONE, JUNIT_PARALLELISM)
private val MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors()
private val POOL_SIZE = minOf(MAX_POOL_SIZE, maxOf(MIN_POOL_SIZE, JUNIT_PARALLELISM))
private val DEFAULT_POOL_SIZE = minOf(MAX_POOL_SIZE, MIN_POOL_SIZE)

private val instances = ConcurrentHashMap<String, KafkaContainer>()

// gjenbruker containers med samme navn for å unngå
// å spinne opp mange containers
fun container(appnavn: String, numberOfTopics: Int = POOL_SIZE): KafkaContainer {
fun container(appnavn: String, numberOfTopics: Int = DEFAULT_POOL_SIZE, minPoolSize: Int = MIN_POOL_SIZE): KafkaContainer {
val poolSize = maxOf(numberOfTopics, minPoolSize)
return instances.getOrPut(appnavn) {
KafkaContainer(appnavn, numberOfTopics)
KafkaContainer(appnavn, poolSize)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TestTopic(
val consumerProperties = Properties().apply {
putAll(connectionProperties)
put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-$topicnavn")
//put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.Assertions.assertTrue

const val MAX_TOPICS_SIZE = 4
val kafkaContainer = KafkaContainers.container("tbd-libs-kafka-test", numberOfTopics = MAX_TOPICS_SIZE)
val kafkaContainer = KafkaContainers.container("tbd-libs-kafka-test", numberOfTopics = MAX_TOPICS_SIZE, minPoolSize = MAX_TOPICS_SIZE)

class KafkaContainersTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlin.random.Random

internal class RapidIntegrationTest {
private companion object {
private val kafkaContainer = KafkaContainers.container("tbd-rapid-and-rivers")
private val kafkaContainer = KafkaContainers.container("tbd-rapid-and-rivers", minPoolSize = 2)
}
private val objectMapper = jacksonObjectMapper()
.registerModule(JavaTimeModule())
Expand Down

0 comments on commit 7abed1a

Please sign in to comment.