Skip to content

Commit

Permalink
Replace kafka-junit with testcontainers (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 7, 2025
1 parent 7abb6da commit 7175260
Show file tree
Hide file tree
Showing 20 changed files with 782 additions and 558 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ configure<com.bakdata.gradle.SonatypeSettings> {

subprojects {
apply(plugin = "java-library")
apply(plugin = "java-test-fixtures")
apply(plugin = "io.freefair.lombok")

configure<JavaPluginExtension> {
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version=3.1.1-SNAPSHOT
org.gradle.caching=true
# running Kafka JUnit in parallel causes problems
# running Kafka Streams in parallel causes problems with colliding consumer groups
org.gradle.parallel=false
kafkaVersion=3.6.1
kafkaJunitVersion=3.6.0
testContainersVersion=1.20.4
confluentVersion=7.6.0
fluentKafkaVersion=2.14.0
junitVersion=5.10.2
Expand Down
5 changes: 1 addition & 4 deletions streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ dependencies {
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
val kafkaJunitVersion: String by project
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(testFixtures(project(":streams-bootstrap-core")))
testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
val fluentKafkaVersion: String by project
testImplementation(
Expand Down
49 changes: 26 additions & 23 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,23 +24,23 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static net.mguenther.kafka.junit.Wait.delay;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.util.ImprovedAdminClient;
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;
import org.testcontainers.kafka.ConfluentKafkaContainer;

class CliTest {

Expand Down Expand Up @@ -214,7 +214,7 @@ public SerdeConfig defaultSerializationConfig() {
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -235,23 +235,23 @@ public SerdeConfig defaultSerializationConfig() {
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());

runApp(app,
"--bootstrap-server", kafkaCluster.getBrokerList(),
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar"))));
delay(10, TimeUnit.SECONDS);
new KafkaContainerHelper(kafkaCluster).send()
.to(input, List.of(new KeyValue<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
}
}

@Test
@ExpectSystemExitWithStatus(0)
void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException {
void shouldExitWithSuccessCodeOnShutdown() {
final String input = "input";
final String output = "output";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -270,22 +270,25 @@ public SerdeConfig defaultSerializationConfig() {
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
kafkaCluster.createTopic(TopicConfig.withName(output).build());
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster);
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
}

runApp(app,
"--bootstrap-server", kafkaCluster.getBrokerList(),
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input,
"--output-topic", output
);
kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar"))));
delay(10, TimeUnit.SECONDS);
final List<KeyValue<String, String>> keyValues = kafkaCluster.read(ReadKeyValues.from(output));
kafkaContainerHelper.send()
.to(input, List.of(new KeyValue<>("foo", "bar")));
final List<ConsumerRecord<String, String>> keyValues = kafkaContainerHelper.read()
.from(output, Duration.ofSeconds(10));
assertThat(keyValues)
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.getKey()).isEqualTo("foo");
assertThat(kv.getValue()).isEqualTo("bar");
assertThat(kv.key()).isEqualTo("foo");
assertThat(kv.value()).isEqualTo("bar");
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -25,25 +25,22 @@
package com.bakdata.kafka.integration;

import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -53,12 +50,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@Testcontainers
class RunProducerAppTest {
private static final int TIMEOUT_SECONDS = 10;
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();

@BeforeEach
void setup() {
Expand All @@ -73,7 +75,6 @@ void tearDown() {
@Test
void shouldRunApp() throws InterruptedException {
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
Expand All @@ -90,30 +91,29 @@ public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class);
}
})) {
app.setBootstrapServers(this.kafkaCluster.getBrokerList());
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl());
app.setOutputTopic(output);
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.run();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class)
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
assertThat(kafkaContainerHelper.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.build()))
.<String, TestRecord>from(output, TIMEOUT))
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.getKey()).isEqualTo("foo");
assertThat(kv.getValue().getContent()).isEqualTo("bar");
assertThat(kv.key()).isEqualTo("foo");
assertThat(kv.value().getContent()).isEqualTo("bar");
});
app.clean();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.exists(app.getOutputTopic()))
.as("Output topic is deleted")
.isFalse();
Thread.sleep(TIMEOUT.toMillis());
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
.as("Output topic is deleted")
.isFalse();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,72 +24,63 @@

package com.bakdata.kafka.integration;

import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static net.mguenther.kafka.junit.Wait.delay;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.test_applications.Mirror;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.SendKeyValuesTransactional;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.kafka.streams.KeyValue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@Testcontainers
@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest {
private static final int TIMEOUT_SECONDS = 10;
private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();

@BeforeEach
void setup() {
this.kafkaCluster.start();
}

@AfterEach
void tearDown() {
this.kafkaCluster.stop();
}
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@Container
private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();

@Test
void shouldRunApp() throws InterruptedException {
void shouldRunApp() {
final String input = "input";
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(input).useDefaults());
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
}
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.kafkaCluster.getBrokerList());
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.setInputTopics(List.of(input));
app.setOutputTopic(output);
// run in Thread because the application blocks indefinitely
new Thread(app).start();
final SendKeyValuesTransactional<String, String> kvSendKeyValuesTransactionalBuilder =
SendKeyValuesTransactional.inTransaction(input, List.of(new KeyValue<>("foo", "bar")))
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
this.kafkaCluster.send(kvSendKeyValuesTransactionalBuilder);
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, String.class)
kafkaContainerHelper.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new KeyValue<>("foo", "bar")));
assertThat(kafkaContainerHelper.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build()))
.from(output, TIMEOUT))
.hasSize(1);
}
}
Expand Down
Loading

0 comments on commit 7175260

Please sign in to comment.