Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tests in parallel #274

Merged
merged 51 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d313f77
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
aea9e77
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
eb08875
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
b8b38dc
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
3c4afb3
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
9cffdcf
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
9a3a968
Bump CI versions
yannick-roeder Jan 16, 2025
ad822fe
Fix Helm publish
yannick-roeder Jan 16, 2025
5ef5e4c
Fix Helm publish
yannick-roeder Jan 16, 2025
f191c92
Fix Helm publish
yannick-roeder Jan 16, 2025
faac25d
Fix Helm publish
yannick-roeder Jan 16, 2025
1309e18
Fix Helm publish
yannick-roeder Jan 16, 2025
0dbeb38
Fix Helm publish
yannick-roeder Jan 16, 2025
8759f60
Merge remote-tracking branch 'origin/fix/ci' into feature/test-improv…
philipp94831 Jan 16, 2025
f353ec1
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
692796c
Merge remote-tracking branch 'origin/master' into feature/test-improv…
philipp94831 Jan 17, 2025
21a0293
Update
philipp94831 Jan 17, 2025
f2fd574
Update
philipp94831 Jan 17, 2025
5637380
Update
philipp94831 Jan 17, 2025
16de422
Add docs
philipp94831 Jan 17, 2025
b8b23e2
Add docs
philipp94831 Jan 17, 2025
90f6856
Use Awaitility
philipp94831 Jan 17, 2025
f68a1c3
Use Awaitility
philipp94831 Jan 17, 2025
25ed943
Use Awaitility
philipp94831 Jan 17, 2025
0a2d27b
Use Awaitility
philipp94831 Jan 17, 2025
4bf9fb5
Update
philipp94831 Jan 17, 2025
282a7be
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 17, 2025
d0c5e3c
Update
philipp94831 Jan 17, 2025
8307752
Update
philipp94831 Jan 17, 2025
dbe16f7
Update
philipp94831 Jan 17, 2025
464d759
Update
philipp94831 Jan 17, 2025
9c5f6dc
Update
philipp94831 Jan 17, 2025
afa135c
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 17, 2025
97ccd80
Update
philipp94831 Jan 17, 2025
0bc1440
Update
philipp94831 Jan 17, 2025
857a910
Update
philipp94831 Jan 17, 2025
83fc24c
Update
philipp94831 Jan 17, 2025
9f667ea
Update
philipp94831 Jan 17, 2025
3c98bea
Update
philipp94831 Jan 17, 2025
1601954
Update
philipp94831 Jan 20, 2025
a1ecd38
Update
philipp94831 Jan 20, 2025
a4b4c33
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 20, 2025
d9a0182
Run tests in parallel
philipp94831 Jan 20, 2025
09f386c
Address review
philipp94831 Jan 20, 2025
3d7300b
Address review
philipp94831 Jan 20, 2025
2045f42
Address review
philipp94831 Jan 20, 2025
87e21cd
Address review
philipp94831 Jan 20, 2025
89c2edf
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 20, 2025
42c3270
Merge remote-tracking branch 'origin/master' into feature/awaitility
philipp94831 Jan 20, 2025
6a45e34
Merge branch 'feature/awaitility' into feature/parallel-tests
philipp94831 Jan 20, 2025
5397ba9
Merge remote-tracking branch 'origin/master' into feature/parallel-tests
philipp94831 Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ plugins {
allprojects {
group = "com.bakdata.kafka"

tasks.withType<Test> {
maxParallelForks = 1 // Embedded Kafka does not reliably work in parallel since Kafka 3.0
useJUnitPlatform()
philipp94831 marked this conversation as resolved.
Show resolved Hide resolved
}

repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
Expand Down
3 changes: 1 addition & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
version=3.4.1-SNAPSHOT
org.gradle.caching=true
# running Kafka Streams in parallel causes problems with colliding consumer groups
org.gradle.parallel=false
org.gradle.parallel=true
kafkaVersion=3.8.1
testContainersVersion=1.20.4
confluentVersion=7.8.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.test_applications.Mirror;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class RunStreamsAppTest extends KafkaTest {
@TempDir
private Path stateDir;

@Test
void shouldRunApp() {
Expand All @@ -50,9 +54,7 @@ void shouldRunApp() {
testClient.createTopic(output);
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.getBootstrapServers());
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
app.setInputTopics(List.of(input));
app.setOutputTopic(output);
// run in Thread because the application blocks indefinitely
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.test_applications.WordCount;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,18 +46,20 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@Slf4j
@ExtendWith(SoftAssertionsExtension.class)
class StreamsCleanUpTest extends KafkaTest {
@InjectSoftAssertions
private SoftAssertions softly;
@TempDir
private Path stateDir;

@Test
void shouldClean() {
Expand Down Expand Up @@ -194,10 +197,7 @@ private KafkaStreamsApplication<?> createWordCountApplication() {

private <T extends KafkaStreamsApplication<?>> T configure(final T application) {
application.setBootstrapServers(this.getBootstrapServers());
application.setKafkaConfig(Map.of(
StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
application.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
return application;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka.integration;


import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -57,6 +56,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -73,6 +73,7 @@
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
Expand All @@ -86,39 +87,13 @@ class StreamsCleanUpRunnerTest extends KafkaTest {
private SoftAssertions softly;
@Mock
private TopicHook topicHook;
@TempDir
private Path stateDir;

static <K, V> KeyValue<K, V> toKeyValue(final ConsumerRecord<K, V> consumerRecord) {
return new KeyValue<>(consumerRecord.key(), consumerRecord.value());
}

private static ConfiguredStreamsApp<StreamsApp> createWordCountPatternApplication() {
return configureApp(new WordCountPattern(), StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*_topic"))
.outputTopic("word_output")
.build());
}

private static ConfiguredStreamsApp<StreamsApp> createWordCountApplication() {
return configureApp(new WordCount(), StreamsTopicConfig.builder()
.inputTopics(List.of("word_input"))
.outputTopic("word_output")
.build());
}

private static ConfiguredStreamsApp<StreamsApp> createMirrorValueApplication() {
return configureApp(new MirrorValueWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private static ConfiguredStreamsApp<StreamsApp> createMirrorKeyApplication() {
return configureApp(new MirrorKeyWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private static void reset(final ExecutableApp<?, StreamsCleanUpRunner, ?> app) {
try (final StreamsCleanUpRunner cleanUpRunner = app.createCleanUpRunner()) {
cleanUpRunner.reset();
Expand All @@ -131,9 +106,13 @@ private static void clean(final ExecutableApp<?, ? extends CleanUpRunner, ?> app
}
}

ConfiguredStreamsApp<StreamsApp> configureApp(final StreamsApp app, final StreamsTopicConfig topics) {
return StreamsRunnerTest.configureApp(app, topics, this.stateDir);
}

@Test
void shouldDeleteTopic() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -171,7 +150,7 @@ void shouldDeleteTopic() {

@Test
void shouldDeleteConsumerGroup() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -216,7 +195,7 @@ void shouldDeleteConsumerGroup() {

@Test
void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -353,7 +332,7 @@ void shouldDeleteIntermediateTopics() {

@Test
void shouldDeleteState() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -391,7 +370,7 @@ void shouldDeleteState() {

@Test
void shouldReprocessAlreadySeenRecords() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -422,7 +401,7 @@ void shouldReprocessAlreadySeenRecords() {
@Test
void shouldDeleteValueSchema()
throws IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorValueApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorValueApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
Expand Down Expand Up @@ -454,7 +433,7 @@ void shouldDeleteValueSchema()
@Test
void shouldDeleteKeySchema()
throws IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
Expand Down Expand Up @@ -585,15 +564,15 @@ void shouldCallCleanUpHookForAllTopics() {

@Test
void shouldNotThrowExceptionOnMissingInputTopic() {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint())) {
this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException();
}
}

@Test
void shouldThrowExceptionOnResetterError() {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final StreamsRunner runner = executableApp.createRunner()) {
final KafkaTestClient testClient = this.newTestClient();
Expand All @@ -610,7 +589,7 @@ void shouldThrowExceptionOnResetterError() {

@Test
void shouldReprocessAlreadySeenRecordsWithPattern() {
try (final ConfiguredStreamsApp<StreamsApp> app = createWordCountPatternApplication();
try (final ConfiguredStreamsApp<StreamsApp> app = this.createWordCountPatternApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(
this.createEndpointWithoutSchemaRegistry())) {
final KafkaTestClient testClient = this.newTestClient();
Expand Down Expand Up @@ -643,6 +622,34 @@ void shouldReprocessAlreadySeenRecordsWithPattern() {
}
}

private ConfiguredStreamsApp<StreamsApp> createWordCountPatternApplication() {
return this.configureApp(new WordCountPattern(), StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*_topic"))
.outputTopic("word_output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createWordCountApplication() {
return this.configureApp(new WordCount(), StreamsTopicConfig.builder()
.inputTopics(List.of("word_input"))
.outputTopic("word_output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createMirrorValueApplication() {
return this.configureApp(new MirrorValueWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createMirrorKeyApplication() {
return this.configureApp(new MirrorKeyWithAvro(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private void run(final ExecutableStreamsApp<?> app) {
try (final StreamsRunner runner = app.createRunner()) {
StreamsRunnerTest.run(runner);
Expand All @@ -653,15 +660,15 @@ private void run(final ExecutableStreamsApp<?> app) {

private ConfiguredStreamsApp<StreamsApp> createComplexApplication() {
this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC);
return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder()
return this.configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build());
}

private ConfiguredStreamsApp<StreamsApp> createComplexCleanUpHookApplication() {
this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC);
return configureApp(new ComplexTopologyApplication() {
return this.configureApp(new ComplexTopologyApplication() {
@Override
public StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
Expand Down
Loading