Skip to content

Commit

Permalink
Add methods for simplified testing of Kafka endpoints (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 20, 2025
1 parent 2271bc7 commit cc28c79
Show file tree
Hide file tree
Showing 18 changed files with 691 additions and 550 deletions.
44 changes: 27 additions & 17 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static java.util.Collections.emptyMap;
import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT;
import static com.bakdata.kafka.KafkaTest.newCluster;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
import java.time.Duration;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;
import org.testcontainers.kafka.KafkaContainer;
Expand Down Expand Up @@ -214,7 +216,7 @@ public SerdeConfig defaultSerializationConfig() {
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final KafkaContainer kafkaCluster = newKafkaCluster();
try (final KafkaContainer kafkaCluster = newCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -240,8 +242,12 @@ public SerdeConfig defaultSerializationConfig() {
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
new KafkaContainerHelper(kafkaCluster).send()
.to(input, List.of(new KeyValue<>("foo", "bar")));
new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build()).send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
}
}
Expand All @@ -251,7 +257,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithSuccessCodeOnShutdown() {
final String input = "input";
final String output = "output";
try (final KafkaContainer kafkaCluster = newKafkaCluster();
try (final KafkaContainer kafkaCluster = newCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -270,20 +276,24 @@ public SerdeConfig defaultSerializationConfig() {
}
})) {
kafkaCluster.start();
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster);
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
}
final KafkaTestClient testClient = new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build());
testClient.createTopic(output);

runApp(app,
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input,
"--output-topic", output
);
kafkaContainerHelper.send()
.to(input, List.of(new KeyValue<>("foo", "bar")));
final List<ConsumerRecord<String, String>> keyValues = kafkaContainerHelper.read()
.from(output, Duration.ofSeconds(10));
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
final List<ConsumerRecord<String, String>> keyValues = testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(output, POLL_TIMEOUT);
assertThat(keyValues)
.hasSize(1)
.anySatisfy(kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@

package com.bakdata.kafka.integration;

import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.KafkaTest;
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.kafka.util.ImprovedAdminClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.time.Duration;
Expand All @@ -45,29 +44,10 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class RunProducerAppTest {
class RunProducerAppTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String SCHEMA_REGISTRY_URL = "mock://";
@Container
private final KafkaContainer kafkaCluster = newKafkaCluster();

@BeforeEach
void setup() {
this.kafkaCluster.start();
}

@AfterEach
void tearDown() {
this.kafkaCluster.stop();
}

@Test
void shouldRunApp() throws InterruptedException {
Expand All @@ -88,24 +68,24 @@ public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class);
}
})) {
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL);
app.setBootstrapServers(this.getBootstrapServers());
final String schemaRegistryUrl = this.getSchemaRegistryUrl();
app.setSchemaRegistryUrl(schemaRegistryUrl);
app.setOutputTopic(output);
app.run();
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
assertThat(kafkaContainerHelper.read()
final KafkaTestClient testClient = this.newTestClient();
assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.<String, TestRecord>from(output, TIMEOUT))
.<String, TestRecord>from(output, POLL_TIMEOUT))
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.key()).isEqualTo("foo");
assertThat(kv.value().getContent()).isEqualTo("bar");
});
app.clean();
Thread.sleep(TIMEOUT.toMillis());
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
try (final ImprovedAdminClient admin = testClient.admin()) {
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
.as("Output topic is deleted")
.isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,63 +24,50 @@

package com.bakdata.kafka.integration;

import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.KafkaTest;
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.test_applications.Mirror;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@Container
private final KafkaContainer kafkaCluster = newKafkaCluster();
class RunStreamsAppTest extends KafkaTest {

@Test
void shouldRunApp() {
final String input = "input";
final String output = "output";
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
}
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(output);
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
app.setBootstrapServers(this.getBootstrapServers());
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.setInputTopics(List.of(input));
app.setOutputTopic(output);
// run in Thread because the application blocks indefinitely
new Thread(app).start();
kafkaContainerHelper.send()
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new KeyValue<>("foo", "bar")));
assertThat(kafkaContainerHelper.read()
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
assertThat(testClient.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.from(output, TIMEOUT))
.from(output, POLL_TIMEOUT))
.hasSize(1);
}
}
Expand Down
Loading

0 comments on commit cc28c79

Please sign in to comment.