Skip to content

Commit

Permalink
Use Awaitility (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 20, 2025
1 parent cc28c79 commit d967deb
Show file tree
Hide file tree
Showing 16 changed files with 405 additions and 185 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
log4jVersion=2.24.3
awaitilityVersion=4.2.2
org.gradle.jvmargs=-Xmx4096m
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -95,7 +95,7 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
private Map<String, String> kafkaConfig = emptyMap();

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
* <p>This method needs to be called in the executable custom application class inheriting from
* {@code KafkaApplication}.</p>
* <p>This method calls System exit</p>
*
Expand All @@ -109,7 +109,7 @@ public static void startApplication(final KafkaApplication<?, ?, ?, ?, ?, ?, ?>
}

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
* <p>This method needs to be called in the executable custom application class inheriting from
* {@code KafkaApplication}.</p>
*
* @param app An instance of the custom application class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT;
import static com.bakdata.kafka.KafkaTest.newCluster;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
Expand All @@ -46,8 +47,10 @@

class CliTest {

private static void runApp(final KafkaStreamsApplication<?> app, final String... args) {
new Thread(() -> KafkaApplication.startApplication(app, args)).start();
private static Thread runApp(final KafkaStreamsApplication<?> app, final String... args) {
final Thread thread = new Thread(() -> KafkaApplication.startApplication(app, args));
thread.start();
return thread;
}

@Test
Expand Down Expand Up @@ -214,7 +217,7 @@ public SerdeConfig defaultSerializationConfig() {

@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
void shouldExitWithErrorInTopology() {
final String input = "input";
try (final KafkaContainer kafkaCluster = newCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
Expand All @@ -238,7 +241,7 @@ public SerdeConfig defaultSerializationConfig() {
})) {
kafkaCluster.start();

runApp(app,
final Thread thread = runApp(app,
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
Expand All @@ -248,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() {
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
await("Thread is dead").atMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.bakdata.kafka.util.ImprovedAdminClient;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -47,10 +46,9 @@
import org.junit.jupiter.api.Test;

class RunProducerAppTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);

@Test
void shouldRunApp() throws InterruptedException {
void shouldRunApp() {
final String output = "output";
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
@Override
Expand Down Expand Up @@ -84,7 +82,6 @@ public SerializerConfig defaultSerializationConfig() {
assertThat(kv.value().getContent()).isEqualTo("bar");
});
app.clean();
Thread.sleep(TIMEOUT.toMillis());
try (final ImprovedAdminClient admin = testClient.admin()) {
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
.as("Output topic is deleted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest extends KafkaTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.test_applications.WordCount;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -52,33 +51,15 @@
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@Slf4j
@ExtendWith(SoftAssertionsExtension.class)
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class StreamsCleanUpTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@InjectSoftAssertions
private SoftAssertions softly;

private static void runAppAndClose(final KafkaStreamsApplication<?> app) throws InterruptedException {
runApp(app);
app.stop();
}

private static void runApp(final KafkaStreamsApplication<?> app) throws InterruptedException {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
Thread.sleep(TIMEOUT.toMillis());
}

@Test
void shouldClean() throws InterruptedException {
void shouldClean() {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
Expand All @@ -98,8 +79,8 @@ void shouldClean() throws InterruptedException {
);
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
// Wait until all stream applications are completely stopped before triggering cleanup
this.awaitClosed(app.createExecutableApp());
app.clean();

try (final ImprovedAdminClient admin = testClient.admin()) {
Expand All @@ -114,7 +95,7 @@ void shouldClean() throws InterruptedException {
}

@Test
void shouldReset() throws InterruptedException {
void shouldReset() {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
Expand All @@ -134,8 +115,8 @@ void shouldReset() throws InterruptedException {
);
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
// Wait until all stream applications are completely stopped before triggering cleanup
this.awaitClosed(app.createExecutableApp());
app.reset();

try (final ImprovedAdminClient admin = testClient.admin()) {
Expand All @@ -152,21 +133,31 @@ void shouldReset() throws InterruptedException {
}

@Test
void shouldCallClose() throws InterruptedException {
void shouldCallClose() {
try (final CloseFlagApp app = this.createCloseFlagApplication()) {
this.newTestClient().createTopic(app.getInputTopics().get(0));
Thread.sleep(TIMEOUT.toMillis());
this.softly.assertThat(app.isClosed()).isFalse();
this.softly.assertThat(app.isAppClosed()).isFalse();
app.clean();
this.softly.assertThat(app.isAppClosed()).isTrue();
app.setAppClosed(false);
Thread.sleep(TIMEOUT.toMillis());
app.reset();
this.softly.assertThat(app.isAppClosed()).isTrue();
}
}

private void runAppAndClose(final KafkaStreamsApplication<?> app) {
this.runApp(app);
app.stop();
}

private void runApp(final KafkaStreamsApplication<?> app) {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
this.awaitProcessing(app.createExecutableApp());
}

private CloseFlagApp createCloseFlagApplication() {
final CloseFlagApp app = new CloseFlagApp();
app.setInputTopics(List.of("input"));
Expand All @@ -185,9 +176,8 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
}

private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>> expectedValues,
final String description, final KafkaStreamsApplication<?> app)
throws InterruptedException {
runAppAndClose(app);
final String description, final KafkaStreamsApplication<?> app) {
this.runAppAndClose(app);

final List<KeyValue<String, Long>> output = this.readOutputTopic(app.getOutputTopic());
this.softly.assertThat(output)
Expand Down
2 changes: 2 additions & 0 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val awaitilityVersion: String by project
testFixturesApi(group = "org.awaitility", name = "awaitility", version = awaitilityVersion)
}

tasks.withType<Test> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;

/**
Expand Down Expand Up @@ -68,6 +71,14 @@ private static KafkaAdminException failedToListGroups(final Throwable ex) {
return new KafkaAdminException("Failed to list consumer groups", ex);
}

private static KafkaAdminException failedToListOffsets(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to list offsets for consumer group" + groupName, ex);
}

private static KafkaAdminException failedToDescribeGroup(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to describe consumer group" + groupName, ex);
}

/**
* Delete a consumer group.
*
Expand All @@ -93,6 +104,63 @@ public void deleteConsumerGroup(final String groupName) {
}
}

/**
* Describe a consumer group.
*
* @param groupName the consumer group name
* @return consumer group description
*/
public ConsumerGroupDescription describe(final String groupName) {
log.info("Describing consumer group '{}'", groupName);
try {
final ConsumerGroupDescription description =
this.adminClient.describeConsumerGroups(List.of(groupName))
.all()
.get(this.timeout.toSeconds(), TimeUnit.SECONDS)
.get(groupName);
log.info("Described consumer group '{}'", groupName);
return description;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw failedToDescribeGroup(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw failedToDescribeGroup(groupName, ex);
} catch (final TimeoutException ex) {
throw failedToDescribeGroup(groupName, ex);
}
}

/**
* List offsets for a consumer group.
*
* @param groupName the consumer group name
* @return consumer group offsets
*/
public Map<TopicPartition, OffsetAndMetadata> listOffsets(final String groupName) {
log.info("Listing offsets for consumer group '{}'", groupName);
try {
final Map<TopicPartition, OffsetAndMetadata> offsets =
this.adminClient.listConsumerGroupOffsets(groupName)
.partitionsToOffsetAndMetadata(groupName)
.get(this.timeout.toSeconds(), TimeUnit.SECONDS);
log.info("Listed offsets for consumer group '{}'", groupName);
return offsets;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw failedToListOffsets(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw failedToListOffsets(groupName, ex);
} catch (final TimeoutException ex) {
throw failedToListOffsets(groupName, ex);
}
}

@Override
public void close() {
this.adminClient.close();
Expand Down
Loading

0 comments on commit d967deb

Please sign in to comment.