diff --git a/build.gradle.kts b/build.gradle.kts index 9f0deeff..7b85c4da 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,11 +8,6 @@ plugins { allprojects { group = "com.bakdata.kafka" - tasks.withType { - maxParallelForks = 1 // Embedded Kafka does not reliably work in parallel since Kafka 3.0 - useJUnitPlatform() - } - repositories { mavenCentral() maven(url = "https://packages.confluent.io/maven/") diff --git a/gradle.properties b/gradle.properties index dc74ebe6..85c90108 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,6 @@ version=3.4.1-SNAPSHOT org.gradle.caching=true -# running Kafka Streams in parallel causes problems with colliding consumer groups -org.gradle.parallel=false +org.gradle.parallel=true kafkaVersion=3.8.1 testContainersVersion=1.20.4 confluentVersion=7.8.0 diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index d6742200..b3a99c2f 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -31,16 +31,20 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.Mirror; +import java.nio.file.Path; import java.util.List; -import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; class RunStreamsAppTest extends KafkaTest { + @TempDir + private Path stateDir; @Test void shouldRunApp() { @@ -50,9 +54,7 @@ void shouldRunApp() { testClient.createTopic(output); try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { app.setBootstrapServers(this.getBootstrapServers()); - app.setKafkaConfig(Map.of( - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" - )); + app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); // run in Thread because the application blocks indefinitely diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 4a1676cd..200af6f0 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -31,10 +31,11 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; +import java.nio.file.Path; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; @@ -45,18 +46,20 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; @Slf4j @ExtendWith(SoftAssertionsExtension.class) class StreamsCleanUpTest extends KafkaTest { @InjectSoftAssertions private SoftAssertions softly; + @TempDir + private Path stateDir; @Test void shouldClean() { @@ -194,10 +197,7 @@ private KafkaStreamsApplication createWordCountApplication() { private > T configure(final T application) { application.setBootstrapServers(this.getBootstrapServers()); - application.setKafkaConfig(Map.of( - StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0", - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" - )); + application.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); return application; } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index e1fc3c96..016804b6 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -25,7 +25,6 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -57,6 +56,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; +import java.nio.file.Path; import java.util.List; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -73,6 +73,7 @@ import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -86,39 +87,13 @@ class StreamsCleanUpRunnerTest extends KafkaTest { private SoftAssertions softly; @Mock private TopicHook topicHook; + @TempDir + private Path stateDir; static KeyValue toKeyValue(final ConsumerRecord consumerRecord) { return new KeyValue<>(consumerRecord.key(), consumerRecord.value()); } - private static ConfiguredStreamsApp createWordCountPatternApplication() { - return configureApp(new WordCountPattern(), StreamsTopicConfig.builder() - .inputPattern(Pattern.compile(".*_topic")) - .outputTopic("word_output") - .build()); - } - - private static ConfiguredStreamsApp createWordCountApplication() { - return configureApp(new WordCount(), StreamsTopicConfig.builder() - .inputTopics(List.of("word_input")) - .outputTopic("word_output") - .build()); - } - - private static ConfiguredStreamsApp createMirrorValueApplication() { - return configureApp(new MirrorValueWithAvro(), StreamsTopicConfig.builder() - .inputTopics(List.of("input")) - .outputTopic("output") - .build()); - } - - private static ConfiguredStreamsApp createMirrorKeyApplication() { - return configureApp(new MirrorKeyWithAvro(), StreamsTopicConfig.builder() - .inputTopics(List.of("input")) - .outputTopic("output") - .build()); - } - private static void reset(final ExecutableApp app) { try (final StreamsCleanUpRunner cleanUpRunner = app.createCleanUpRunner()) { cleanUpRunner.reset(); @@ -131,9 +106,13 @@ private static void clean(final ExecutableApp app } } + ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { + return StreamsRunnerTest.configureApp(app, topics, this.stateDir); + } + @Test void shouldDeleteTopic() { - try (final ConfiguredStreamsApp app = createWordCountApplication(); + try (final ConfiguredStreamsApp app = this.createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { final KafkaTestClient testClient = this.newTestClient(); @@ -171,7 +150,7 @@ void shouldDeleteTopic() { @Test void shouldDeleteConsumerGroup() { - try (final ConfiguredStreamsApp app = createWordCountApplication(); + try (final ConfiguredStreamsApp app = this.createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { final KafkaTestClient testClient = this.newTestClient(); @@ -216,7 +195,7 @@ void shouldDeleteConsumerGroup() { @Test void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { - try (final ConfiguredStreamsApp app = createWordCountApplication(); + try (final ConfiguredStreamsApp app = this.createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { final KafkaTestClient testClient = this.newTestClient(); @@ -353,7 +332,7 @@ void shouldDeleteIntermediateTopics() { @Test void shouldDeleteState() { - try (final ConfiguredStreamsApp app = createWordCountApplication(); + try (final ConfiguredStreamsApp app = this.createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { final KafkaTestClient testClient = this.newTestClient(); @@ -391,7 +370,7 @@ void shouldDeleteState() { @Test void shouldReprocessAlreadySeenRecords() { - try (final ConfiguredStreamsApp app = createWordCountApplication(); + try (final ConfiguredStreamsApp app = this.createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { final KafkaTestClient testClient = this.newTestClient(); @@ -422,7 +401,7 @@ void shouldReprocessAlreadySeenRecords() { @Test void shouldDeleteValueSchema() throws IOException, RestClientException { - try (final ConfiguredStreamsApp app = createMirrorValueApplication(); + try (final ConfiguredStreamsApp app = this.createMirrorValueApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); @@ -454,7 +433,7 @@ void shouldDeleteValueSchema() @Test void shouldDeleteKeySchema() throws IOException, RestClientException { - try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); + try (final ConfiguredStreamsApp app = this.createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); @@ -585,7 +564,7 @@ void shouldCallCleanUpHookForAllTopics() { @Test void shouldNotThrowExceptionOnMissingInputTopic() { - try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); + try (final ConfiguredStreamsApp app = this.createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException(); } @@ -593,7 +572,7 @@ void shouldNotThrowExceptionOnMissingInputTopic() { @Test void shouldThrowExceptionOnResetterError() { - try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); + try (final ConfiguredStreamsApp app = this.createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); @@ -610,7 +589,7 @@ void shouldThrowExceptionOnResetterError() { @Test void shouldReprocessAlreadySeenRecordsWithPattern() { - try (final ConfiguredStreamsApp app = createWordCountPatternApplication(); + try (final ConfiguredStreamsApp app = this.createWordCountPatternApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { final KafkaTestClient testClient = this.newTestClient(); @@ -643,6 +622,34 @@ void shouldReprocessAlreadySeenRecordsWithPattern() { } } + private ConfiguredStreamsApp createWordCountPatternApplication() { + return this.configureApp(new WordCountPattern(), StreamsTopicConfig.builder() + .inputPattern(Pattern.compile(".*_topic")) + .outputTopic("word_output") + .build()); + } + + private ConfiguredStreamsApp createWordCountApplication() { + return this.configureApp(new WordCount(), StreamsTopicConfig.builder() + .inputTopics(List.of("word_input")) + .outputTopic("word_output") + .build()); + } + + private ConfiguredStreamsApp createMirrorValueApplication() { + return this.configureApp(new MirrorValueWithAvro(), StreamsTopicConfig.builder() + .inputTopics(List.of("input")) + .outputTopic("output") + .build()); + } + + private ConfiguredStreamsApp createMirrorKeyApplication() { + return this.configureApp(new MirrorKeyWithAvro(), StreamsTopicConfig.builder() + .inputTopics(List.of("input")) + .outputTopic("output") + .build()); + } + private void run(final ExecutableStreamsApp app) { try (final StreamsRunner runner = app.createRunner()) { StreamsRunnerTest.run(runner); @@ -653,7 +660,7 @@ private void run(final ExecutableStreamsApp app) { private ConfiguredStreamsApp createComplexApplication() { this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); - return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() + return this.configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") .build()); @@ -661,7 +668,7 @@ private ConfiguredStreamsApp createComplexApplication() { private ConfiguredStreamsApp createComplexCleanUpHookApplication() { this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); - return configureApp(new ComplexTopologyApplication() { + return this.configureApp(new ComplexTopologyApplication() { @Override public StreamsCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 4dbe0270..061b6c5c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.TestTopologyFactory.createStreamsTestConfig; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -43,6 +44,7 @@ import com.bakdata.kafka.test_applications.LabeledInputTopics; import com.bakdata.kafka.test_applications.Mirror; import java.lang.Thread.UncaughtExceptionHandler; +import java.nio.file.Path; import java.time.Duration; import java.util.List; import java.util.Map; @@ -54,7 +56,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -65,6 +66,7 @@ import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -80,6 +82,8 @@ class StreamsRunnerTest extends KafkaTest { private StateListener stateListener; @InjectSoftAssertions private SoftAssertions softly; + @TempDir + private Path stateDir; static Thread run(final StreamsRunner runner) { // run in Thread because the application blocks indefinitely @@ -90,42 +94,24 @@ static Thread run(final StreamsRunner runner) { return thread; } - static ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { - final AppConfiguration configuration = new AppConfiguration<>(topics, Map.of( - StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0", - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" - )); + static ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics, + final Path stateDir) { + final AppConfiguration configuration = + new AppConfiguration<>(topics, createStreamsTestConfig(stateDir)); return new ConfiguredStreamsApp<>(app, configuration); } - private static ConfiguredStreamsApp createMirrorApplication() { - return configureApp(new Mirror(), StreamsTopicConfig.builder() - .inputTopics(List.of("input")) - .outputTopic("output") - .build()); - } - - private static ConfiguredStreamsApp createLabeledInputTopicsApplication() { - return configureApp(new LabeledInputTopics(), StreamsTopicConfig.builder() - .labeledInputTopics(Map.of("label", List.of("input1", "input2"))) - .outputTopic("output") - .build()); - } - - private static ConfiguredStreamsApp createErrorApplication() { - return configureApp(new ErrorApplication(), StreamsTopicConfig.builder() - .inputTopics(List.of("input")) - .outputTopic("output") - .build()); - } - private static void awaitThreadIsDead(final Thread thread) { await("Thread is dead").atMost(Duration.ofSeconds(10)).until(() -> !thread.isAlive()); } + ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { + return configureApp(app, topics, this.stateDir); + } + @Test void shouldRunApp() { - try (final ConfiguredStreamsApp app = createMirrorApplication(); + try (final ConfiguredStreamsApp app = this.createMirrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner()) { final String inputTopic = app.getTopics().getInputTopics().get(0); @@ -147,7 +133,7 @@ void shouldRunApp() { @Test void shouldUseMultipleLabeledInputTopics() { - try (final ConfiguredStreamsApp app = createLabeledInputTopicsApplication(); + try (final ConfiguredStreamsApp app = this.createLabeledInputTopicsApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner()) { final List inputTopics = app.getTopics().getLabeledInputTopics().get("label"); @@ -178,7 +164,7 @@ void shouldUseMultipleLabeledInputTopics() { @Test void shouldThrowOnMissingInputTopic() { when(this.uncaughtExceptionHandler.handle(any())).thenReturn(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); - try (final ConfiguredStreamsApp app = createMirrorApplication(); + try (final ConfiguredStreamsApp app = this.createMirrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner(StreamsExecutionOptions.builder() .stateListener(() -> this.stateListener) @@ -197,7 +183,7 @@ void shouldThrowOnMissingInputTopic() { @Test void shouldCloseOnMapError() { when(this.uncaughtExceptionHandler.handle(any())).thenReturn(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); - try (final ConfiguredStreamsApp app = createErrorApplication(); + try (final ConfiguredStreamsApp app = this.createErrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner(StreamsExecutionOptions.builder() .stateListener(() -> this.stateListener) @@ -222,6 +208,27 @@ void shouldCloseOnMapError() { } } + private ConfiguredStreamsApp createMirrorApplication() { + return this.configureApp(new Mirror(), StreamsTopicConfig.builder() + .inputTopics(List.of("input")) + .outputTopic("output") + .build()); + } + + private ConfiguredStreamsApp createLabeledInputTopicsApplication() { + return this.configureApp(new LabeledInputTopics(), StreamsTopicConfig.builder() + .labeledInputTopics(Map.of("label", List.of("input1", "input2"))) + .outputTopic("output") + .build()); + } + + private ConfiguredStreamsApp createErrorApplication() { + return this.configureApp(new ErrorApplication(), StreamsTopicConfig.builder() + .inputTopics(List.of("input")) + .outputTopic("output") + .build()); + } + @Getter private static class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler { private Throwable lastException; diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index 2c6680b9..883a027b 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -31,12 +31,16 @@ import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import java.nio.file.Path; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; /** * Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} @@ -45,6 +49,11 @@ public final class TestTopologyFactory { private static final String MOCK_URL_PREFIX = "mock://"; + private static final Map STREAMS_TEST_CONFIG = Map.of( + // Disable caching to allow immediate aggregations + StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Long.toString(0L), + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(10_000) + ); private final String schemaRegistryUrl; /** @@ -85,6 +94,34 @@ public static Configurator createConfigurator(final TestTopology testTopol return new Configurator(testTopology.getProperties()); } + /** + * Create a new Kafka Streams config suitable for test environments. This includes setting the following + * parameters in addition to {@link #createStreamsTestConfig()}: + *
    + *
  • {@link StreamsConfig#STATE_DIR_CONFIG}=provided directory
  • + *
+ * @param stateDir directory to use for storing Kafka Streams state + * @return Kafka Streams config + * @see #createStreamsTestConfig() + */ + public static Map createStreamsTestConfig(final Path stateDir) { + final Map config = new HashMap<>(createStreamsTestConfig()); + config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); + return Map.copyOf(config); + } + + /** + * Create a new Kafka Streams config suitable for test environments. This includes setting the following parameters: + *
    + *
  • {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}=0
  • + *
  • {@link ConsumerConfig#SESSION_TIMEOUT_MS_CONFIG}=10000
  • + *
+ * @return Kafka Streams config + */ + public static Map createStreamsTestConfig() { + return STREAMS_TEST_CONFIG; + } + /** * Get Schema Registry URL if configured * @return Schema Registry URL