Skip to content

Commit

Permalink
Replace kafka-junit with testcontainers
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 6, 2025
1 parent 7abb6da commit 02f7af3
Show file tree
Hide file tree
Showing 18 changed files with 862 additions and 506 deletions.
5 changes: 2 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
version=3.1.1-SNAPSHOT
org.gradle.caching=true
# running Kafka JUnit in parallel causes problems
org.gradle.parallel=false
org.gradle.parallel=true
kafkaVersion=3.6.1
kafkaJunitVersion=3.6.0
testContainersVersion=1.20.4
confluentVersion=7.6.0
fluentKafkaVersion=2.14.0
junitVersion=5.10.2
Expand Down
7 changes: 3 additions & 4 deletions streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ dependencies {
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
val kafkaJunitVersion: String by project
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
val testContainersVersion: String by project
testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testImplementation(group = "org.testcontainers", name = "kafka", version = testContainersVersion)
testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
val fluentKafkaVersion: String by project
testImplementation(
Expand Down
50 changes: 27 additions & 23 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,23 +24,23 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static net.mguenther.kafka.junit.Wait.delay;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.util.ImprovedAdminClient;
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;
import org.testcontainers.kafka.ConfluentKafkaContainer;

class CliTest {

Expand Down Expand Up @@ -214,7 +214,7 @@ public SerdeConfig defaultSerializationConfig() {
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -235,23 +235,24 @@ public SerdeConfig defaultSerializationConfig() {
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
// kafkaCluster.createTopic(TopicConfig.withName(input).build());

runApp(app,
"--bootstrap-server", kafkaCluster.getBrokerList(),
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar"))));
delay(10, TimeUnit.SECONDS);
new KafkaContainerHelper(kafkaCluster).send()
.to(input, List.of(new KeyValue<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
}
}

@Test
@ExpectSystemExitWithStatus(0)
void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException {
void shouldExitWithSuccessCodeOnShutdown() {
final String input = "input";
final String output = "output";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -270,22 +271,25 @@ public SerdeConfig defaultSerializationConfig() {
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
kafkaCluster.createTopic(TopicConfig.withName(output).build());
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster);
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
}

runApp(app,
"--bootstrap-server", kafkaCluster.getBrokerList(),
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input,
"--output-topic", output
);
kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar"))));
delay(10, TimeUnit.SECONDS);
final List<KeyValue<String, String>> keyValues = kafkaCluster.read(ReadKeyValues.from(output));
kafkaContainerHelper.send()
.to(input, List.of(new KeyValue<>("foo", "bar")));
final List<ConsumerRecord<String, String>> keyValues = kafkaContainerHelper.read()
.from(output, Duration.ofSeconds(10));
assertThat(keyValues)
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.getKey()).isEqualTo("foo");
assertThat(kv.getValue()).isEqualTo("bar");
assertThat(kv.key()).isEqualTo("foo");
assertThat(kv.value()).isEqualTo("bar");
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.TopicSettings;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.testcontainers.kafka.ConfluentKafkaContainer;

@RequiredArgsConstructor
public class KafkaContainerHelper {

public static final TopicSettings DEFAULT_TOPIC_SETTINGS = TopicSettings.builder()
.partitions(1)
.replicationFactor((short) 1)
.build();
private final @NonNull ConfluentKafkaContainer kafkaCluster;

private static <K, V> List<ConsumerRecord<K, V>> pollAll(final Consumer<K, V> consumer, final Duration timeout) {
final List<ConsumerRecord<K, V>> records = new ArrayList<>();
ConsumerRecords<K, V> poll = consumer.poll(timeout);
while (!poll.isEmpty()) {
poll.forEach(records::add);
poll = consumer.poll(timeout);
}
return records;
}

public SenderBuilder send() {
return new SenderBuilder(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}

public ReaderBuilder read() {
return new ReaderBuilder(Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
));
}

public ImprovedAdminClient admin() {
return ImprovedAdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()
));
}

@RequiredArgsConstructor
public class SenderBuilder {

private final @NonNull Map<String, Object> properties;

public SenderBuilder with(final String key, final Object value) {
final Map<String, Object> newProperties = new HashMap<>(this.properties);
newProperties.put(key, value);
return new SenderBuilder(Map.copyOf(newProperties));
}

public <K, V> void to(final String topic, final Iterable<? extends KeyValue<K, V>> records) {
final Map<String, Object> producerConfig = new HashMap<>(this.properties);
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaContainerHelper.this.kafkaCluster.getBootstrapServers());
try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
records.forEach(kv -> producer.send(new ProducerRecord<>(topic, kv.key, kv.value)));
}
}

}

@RequiredArgsConstructor
public class ReaderBuilder {

private final @NonNull Map<String, Object> properties;

public ReaderBuilder with(final String key, final Object value) {
final Map<String, Object> newProperties = new HashMap<>(this.properties);
newProperties.put(key, value);
return new ReaderBuilder(Map.copyOf(newProperties));
}

public <K, V> List<ConsumerRecord<K, V>> from(final String output, final Duration timeout) {
final Map<String, Object> consumerConfig = new HashMap<>(this.properties);
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaContainerHelper.this.kafkaCluster.getBootstrapServers());
try (final Consumer<K, V> consumer = new KafkaConsumer<>(consumerConfig)) {
final List<PartitionInfo> partitionInfos = consumer.listTopics().get(output);
final List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(partition -> new TopicPartition(partition.topic(), partition.partition()))
.collect(Collectors.toList());
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
return pollAll(consumer, timeout);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,21 +24,13 @@

package com.bakdata.kafka;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig;
import static net.mguenther.kafka.junit.EmbeddedKafkaConfig.brokers;

import lombok.experimental.UtilityClass;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

@UtilityClass
public class TestUtil {
public static EmbeddedKafkaCluster newKafkaCluster() {
return provisionWith(newClusterConfig()
.configure(brokers()
.with("transaction.state.log.num.partitions", 10)
.with("offsets.topic.num.partitions", 10)
.build())
.build());
public static ConfluentKafkaContainer newKafkaCluster() {
return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
}
}
Loading

0 comments on commit 02f7af3

Please sign in to comment.