From 857a9101b241f2df6b50e1b2de7bbd3f99d6305c Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 17 Jan 2025 22:38:06 +0100 Subject: [PATCH] Update --- .../src/test/java/com/bakdata/kafka/CliTest.java | 4 ++-- .../bakdata/kafka/integration/StreamsRunnerTest.java | 10 +++++++--- .../testFixtures/java/com/bakdata/kafka/KafkaTest.java | 3 +++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 02fcad78..664d9ab0 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -24,9 +24,9 @@ package com.bakdata.kafka; -import static com.bakdata.kafka.KafkaTest.awaitAtMost; import static com.bakdata.kafka.KafkaTest.newCluster; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.ginsberg.junit.exit.ExpectSystemExitWithStatus; @@ -244,7 +244,7 @@ public SerdeConfig defaultSerializationConfig() { .bootstrapServers(kafkaCluster.getBootstrapServers()) .build()).send() .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); - awaitAtMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive()); + await("Thread is dead").atMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive()); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index aeb0354b..cdca2da2 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -24,7 +24,7 @@ package com.bakdata.kafka.integration; -import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -120,6 +120,10 @@ private static ConfiguredStreamsApp createErrorApplication() { .build()); } + private static void awaitThreadIsDead(final Thread thread) { + await("Thread is dead").atMost(TIMEOUT).until(() -> !thread.isAlive()); + } + @Test void shouldRunApp() { try (final ConfiguredStreamsApp app = createMirrorApplication(); @@ -184,7 +188,7 @@ void shouldThrowOnMissingInputTopic() { final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - awaitAtMost(TIMEOUT).untilAsserted(() -> assertThat(thread.isAlive()).isFalse()); // softly does not work + awaitThreadIsDead(thread); this.softly.assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class); verify(this.uncaughtExceptionHandler).handle(any()); verify(this.stateListener).onChange(State.ERROR, State.PENDING_ERROR); @@ -211,7 +215,7 @@ void shouldCloseOnMapError() { .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); - awaitAtMost(TIMEOUT).untilAsserted(() -> assertThat(thread.isAlive()).isFalse()); // softly does not work + awaitThreadIsDead(thread); this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class) .satisfies(e -> this.softly.assertThat(e.getCause()).hasMessage("Error in map")); verify(this.uncaughtExceptionHandler).handle(any()); diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index b14cc2c3..b0ca274d 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -87,16 +87,19 @@ private static String getUniqueAppId(final ExecutableStreamsApp app) { protected void awaitProcessing(final ExecutableStreamsApp app, final Duration timeout) { this.awaitActive(app, timeout); awaitAtMost(timeout) + .alias("Consumer group has finished processing") .until(() -> this.hasFinishedProcessing(app)); } protected void awaitActive(final ExecutableStreamsApp app, final Duration timeout) { awaitAtMost(timeout) + .alias("Consumer group is active") .until(() -> this.isActive(app)); } protected void awaitClosed(final ExecutableStreamsApp app, final Duration timeout) { awaitAtMost(timeout) + .alias("Consumer group is closed") .until(() -> this.isClosed(app)); }