Skip to content

Commit

Permalink
Run tests in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 20, 2025
1 parent d96a621 commit ca26263
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 89 deletions.
5 changes: 0 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ plugins {
allprojects {
group = "com.bakdata.kafka"

tasks.withType<Test> {
maxParallelForks = 1 // Embedded Kafka does not reliably work in parallel since Kafka 3.0
useJUnitPlatform()
}

repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
Expand Down
3 changes: 1 addition & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
version=3.4.1-SNAPSHOT
org.gradle.caching=true
# running Kafka Streams in parallel causes problems with colliding consumer groups
org.gradle.parallel=false
org.gradle.parallel=true
kafkaVersion=3.8.1
testContainersVersion=1.20.4
confluentVersion=7.8.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.test_applications.Mirror;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class RunStreamsAppTest extends KafkaTest {
@TempDir
private Path stateDir;

@Test
void shouldRunApp() {
Expand All @@ -50,9 +54,7 @@ void shouldRunApp() {
testClient.createTopic(output);
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.getBootstrapServers());
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
app.setInputTopics(List.of(input));
app.setOutputTopic(output);
// run in Thread because the application blocks indefinitely
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.test_applications.WordCount;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,18 +46,20 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@Slf4j
@ExtendWith(SoftAssertionsExtension.class)
class StreamsCleanUpTest extends KafkaTest {
@InjectSoftAssertions
private SoftAssertions softly;
@TempDir
private Path stateDir;

@Test
void shouldClean() {
Expand Down Expand Up @@ -194,10 +197,7 @@ private KafkaStreamsApplication<?> createWordCountApplication() {

private <T extends KafkaStreamsApplication<?>> T configure(final T application) {
application.setBootstrapServers(this.getBootstrapServers());
application.setKafkaConfig(Map.of(
StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
application.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
return application;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka.integration;


import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -57,6 +56,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -73,6 +73,7 @@
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
Expand All @@ -86,39 +87,13 @@ class StreamsCleanUpRunnerTest extends KafkaTest {
private SoftAssertions softly;
@Mock
private TopicHook topicHook;
@TempDir
private Path stateDir;

static <K, V> KeyValue<K, V> toKeyValue(final ConsumerRecord<K, V> consumerRecord) {
return new KeyValue<>(consumerRecord.key(), consumerRecord.value());
}

private static ConfiguredStreamsApp<StreamsApp> createWordCountPatternApplication() {
return configureApp(new WordCountPattern(), StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*_topic"))
.outputTopic("word_output")
.build());
}

private static ConfiguredStreamsApp<StreamsApp> createWordCountApplication() {
return configureApp(new WordCount(), StreamsTopicConfig.builder()
.inputTopics(List.of("word_input"))
.outputTopic("word_output")
.build());
}

private static ConfiguredStreamsApp<StreamsApp> createMirrorValueApplication() {
return configureApp(new MirrorValueWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private static ConfiguredStreamsApp<StreamsApp> createMirrorKeyApplication() {
return configureApp(new MirrorKeyWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private static void reset(final ExecutableApp<?, StreamsCleanUpRunner, ?> app) {
try (final StreamsCleanUpRunner cleanUpRunner = app.createCleanUpRunner()) {
cleanUpRunner.reset();
Expand All @@ -131,9 +106,13 @@ private static void clean(final ExecutableApp<?, ? extends CleanUpRunner, ?> app
}
}

ConfiguredStreamsApp<StreamsApp> configureApp(final StreamsApp app, final StreamsTopicConfig topics) {
return StreamsRunnerTest.configureApp(app, topics, this.stateDir);
}

@Test
void shouldDeleteTopic() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -171,7 +150,7 @@ void shouldDeleteTopic() {

@Test
void shouldDeleteConsumerGroup() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -216,7 +195,7 @@ void shouldDeleteConsumerGroup() {

@Test
void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -353,7 +332,7 @@ void shouldDeleteIntermediateTopics() {

@Test
void shouldDeleteState() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -391,7 +370,7 @@ void shouldDeleteState() {

@Test
void shouldReprocessAlreadySeenRecords() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -422,7 +401,7 @@ void shouldReprocessAlreadySeenRecords() {
@Test
void shouldDeleteValueSchema()
throws IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorValueApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorValueApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
Expand Down Expand Up @@ -454,7 +433,7 @@ void shouldDeleteValueSchema()
@Test
void shouldDeleteKeySchema()
throws IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
Expand Down Expand Up @@ -585,15 +564,15 @@ void shouldCallCleanUpHookForAllTopics() {

@Test
void shouldNotThrowExceptionOnMissingInputTopic() {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint())) {
this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException();
}
}

@Test
void shouldThrowExceptionOnResetterError() {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final StreamsRunner runner = executableApp.createRunner()) {
final KafkaTestClient testClient = this.newTestClient();
Expand All @@ -610,7 +589,7 @@ void shouldThrowExceptionOnResetterError() {

@Test
void shouldReprocessAlreadySeenRecordsWithPattern() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountPatternApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountPatternApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -643,6 +622,34 @@ void shouldReprocessAlreadySeenRecordsWithPattern() {
}
}

private ConfiguredStreamsApp<StreamsApp> createWordCountPatternApplication() {
return this.configureApp(new WordCountPattern(), StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*_topic"))
.outputTopic("word_output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createWordCountApplication() {
return this.configureApp(new WordCount(), StreamsTopicConfig.builder()
.inputTopics(List.of("word_input"))
.outputTopic("word_output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createMirrorValueApplication() {
return this.configureApp(new MirrorValueWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createMirrorKeyApplication() {
return this.configureApp(new MirrorKeyWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private void run(final ExecutableStreamsApp<?> app) {
try (final StreamsRunner runner = app.createRunner()) {
StreamsRunnerTest.run(runner);
Expand All @@ -653,15 +660,15 @@ private void run(final ExecutableStreamsApp<?> app) {

private ConfiguredStreamsApp<StreamsApp> createComplexApplication() {
this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC);
return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder()
return this.configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createComplexCleanUpHookApplication() {
this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC);
return configureApp(new ComplexTopologyApplication() {
return this.configureApp(new ComplexTopologyApplication() {
@Override
public StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
Expand Down
Loading

0 comments on commit ca26263

Please sign in to comment.