Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 17, 2025
1 parent 0bc1440 commit 857a910
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaTest.awaitAtMost;
import static com.bakdata.kafka.KafkaTest.newCluster;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
Expand Down Expand Up @@ -244,7 +244,7 @@ public SerdeConfig defaultSerializationConfig() {
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build()).send()
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
awaitAtMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive());
await("Thread is dead").atMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package com.bakdata.kafka.integration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -120,6 +120,10 @@ private static ConfiguredStreamsApp<StreamsApp> createErrorApplication() {
.build());
}

private static void awaitThreadIsDead(final Thread thread) {
await("Thread is dead").atMost(TIMEOUT).until(() -> !thread.isAlive());
}

@Test
void shouldRunApp() {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorApplication();
Expand Down Expand Up @@ -184,7 +188,7 @@ void shouldThrowOnMissingInputTopic() {
final Thread thread = run(runner);
final CapturingUncaughtExceptionHandler handler =
(CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler();
awaitAtMost(TIMEOUT).untilAsserted(() -> assertThat(thread.isAlive()).isFalse()); // softly does not work
awaitThreadIsDead(thread);
this.softly.assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class);
verify(this.uncaughtExceptionHandler).handle(any());
verify(this.stateListener).onChange(State.ERROR, State.PENDING_ERROR);
Expand All @@ -211,7 +215,7 @@ void shouldCloseOnMapError() {
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar")));
awaitAtMost(TIMEOUT).untilAsserted(() -> assertThat(thread.isAlive()).isFalse()); // softly does not work
awaitThreadIsDead(thread);
this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class)
.satisfies(e -> this.softly.assertThat(e.getCause()).hasMessage("Error in map"));
verify(this.uncaughtExceptionHandler).handle(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,19 @@ private static String getUniqueAppId(final ExecutableStreamsApp<?> app) {
protected void awaitProcessing(final ExecutableStreamsApp<?> app, final Duration timeout) {
this.awaitActive(app, timeout);
awaitAtMost(timeout)
.alias("Consumer group has finished processing")
.until(() -> this.hasFinishedProcessing(app));
}

protected void awaitActive(final ExecutableStreamsApp<?> app, final Duration timeout) {
awaitAtMost(timeout)
.alias("Consumer group is active")
.until(() -> this.isActive(app));
}

protected void awaitClosed(final ExecutableStreamsApp<?> app, final Duration timeout) {
awaitAtMost(timeout)
.alias("Consumer group is closed")
.until(() -> this.isClosed(app));
}

Expand Down

0 comments on commit 857a910

Please sign in to comment.