From d603699dd5ed799c35e4dcb53f3369f4d678b5af Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 16 Apr 2024 10:06:57 +0200 Subject: [PATCH] Create v3 --- .../kafka/LargeMessageProducerApp.java | 3 +- .../bakdata/kafka/LargeMessageStreamsApp.java | 3 +- .../kafka/KafkaProducerApplication.java | 9 +-- .../kafka/KafkaStreamsApplication.java | 9 +-- ...nfiguration.java => AppConfiguration.java} | 19 ++++--- .../bakdata/kafka/ConfiguredProducerApp.java | 6 +- .../bakdata/kafka/ConfiguredStreamsApp.java | 12 ++-- ...on.java => EffectiveAppConfiguration.java} | 22 ++++--- .../EffectiveProducerAppConfiguration.java | 57 ------------------- .../bakdata/kafka/ExecutableProducerApp.java | 11 ++-- .../java/com/bakdata/kafka/ProducerApp.java | 5 +- .../kafka/ProducerAppConfiguration.java | 47 --------------- .../com/bakdata/kafka/ProducerBuilder.java | 11 ++-- .../com/bakdata/kafka/ProducerRunner.java | 4 ++ .../java/com/bakdata/kafka/StreamsApp.java | 5 +- .../com/bakdata/kafka/TopologyBuilder.java | 11 ++-- .../com/bakdata/kafka/AvroMirrorTest.java | 6 +- .../kafka/ConfiguredProducerAppTest.java | 27 +++++---- .../kafka/ConfiguredStreamsAppTest.java | 27 +++++---- .../kafka/ExecutableProducerAppTest.java | 29 +++------- .../kafka/ExecutableStreamsAppTest.java | 29 +++------- .../ProducerCleanUpRunnerTest.java | 5 +- .../kafka/integration/ProducerRunnerTest.java | 6 +- .../integration/StreamsCleanUpRunnerTest.java | 5 +- .../kafka/integration/StreamsRunnerTest.java | 9 +-- .../kafka/util/TopologyInformationTest.java | 6 +- 26 files changed, 124 insertions(+), 259 deletions(-) rename streams-bootstrap/src/main/java/com/bakdata/kafka/{StreamsAppConfiguration.java => AppConfiguration.java} (79%) rename streams-bootstrap/src/main/java/com/bakdata/kafka/{EffectiveStreamsAppConfiguration.java => EffectiveAppConfiguration.java} (76%) delete mode 100644 streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveProducerAppConfiguration.java delete mode 100644 streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppConfiguration.java diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java index 70f80072..d500b7b6 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java @@ -30,7 +30,8 @@ public interface LargeMessageProducerApp extends ProducerApp { @Override - default ProducerCleanUpConfiguration setupCleanUp(final EffectiveProducerAppConfiguration configuration) { + default ProducerCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(configuration); return configurer.registerTopicHook( LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties())); diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java index 08a1b766..aa74efea 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java @@ -30,7 +30,8 @@ public interface LargeMessageStreamsApp extends StreamsApp { @Override - default StreamsCleanUpConfiguration setupCleanUp(final EffectiveStreamsAppConfiguration configuration) { + default StreamsCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(configuration); return configurer.registerTopicHook( LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties())); diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index d060c6e4..929f4719 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -79,13 +79,10 @@ public void run() { * Create configuration to configure app * @return configuration */ - public final ProducerAppConfiguration createConfiguration() { + public final AppConfiguration createConfiguration() { final ProducerTopicConfig topics = this.createTopicConfig(); final Map kafkaConfig = this.getKafkaConfig(); - return ProducerAppConfiguration.builder() - .topics(topics) - .kafkaConfig(kafkaConfig) - .build(); + return new AppConfiguration<>(topics, kafkaConfig); } @Override @@ -107,7 +104,7 @@ public final ProducerTopicConfig createTopicConfig() { @Override public final ConfiguredProducerApp createConfiguredApp(final boolean cleanUp) { final ProducerApp app = this.createApp(cleanUp); - final ProducerAppConfiguration configuration = this.createConfiguration(); + final AppConfiguration configuration = this.createConfiguration(); return new ConfiguredProducerApp<>(app, configuration); } } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 7c320341..374ae1e6 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -125,13 +125,10 @@ public void reset() { * Create configuration to configure app * @return configuration */ - public final StreamsAppConfiguration createConfiguration() { + public final AppConfiguration createConfiguration() { final StreamsTopicConfig topics = this.createTopicConfig(); final Map kafkaConfig = this.getKafkaConfig(); - return StreamsAppConfiguration.builder() - .topics(topics) - .kafkaConfig(kafkaConfig) - .build(); + return new AppConfiguration<>(topics, kafkaConfig); } @Override @@ -164,7 +161,7 @@ public final StreamsTopicConfig createTopicConfig() { @Override public final ConfiguredStreamsApp createConfiguredApp(final boolean cleanUp) { final StreamsApp app = this.createApp(cleanUp); - final StreamsAppConfiguration configuration = this.createConfiguration(); + final AppConfiguration configuration = this.createConfiguration(); return new ConfiguredStreamsApp<>(app, configuration); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/AppConfiguration.java similarity index 79% rename from streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java rename to streams-bootstrap/src/main/java/com/bakdata/kafka/AppConfiguration.java index 1de045db..cf891c88 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/AppConfiguration.java @@ -27,21 +27,24 @@ import static java.util.Collections.emptyMap; import java.util.Map; -import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.Value; /** - * Configuration of {@link StreamsApp}. This includes {@link StreamsTopicConfig} and Kafka configuration + * Configuration of an app. This includes topics and Kafka configuration */ -@Builder @Value +@RequiredArgsConstructor @EqualsAndHashCode -public class StreamsAppConfiguration { - @Builder.Default - @NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build(); - @Builder.Default - @NonNull Map kafkaConfig = emptyMap(); +public class AppConfiguration { + @NonNull + T topics; + @NonNull + Map kafkaConfig; + public AppConfiguration(final T topics) { + this(topics, emptyMap()); + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index 68885dd1..60fd9adc 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -35,14 +35,14 @@ import org.apache.kafka.common.serialization.StringSerializer; /** - * A {@link ProducerApp} with a corresponding {@link ProducerAppConfiguration} + * A {@link ProducerApp} with a corresponding {@link AppConfiguration} * @param type of {@link ProducerApp} */ @RequiredArgsConstructor public class ConfiguredProducerApp implements ConfiguredApp> { @Getter private final @NonNull T app; - private final @NonNull ProducerAppConfiguration configuration; + private final @NonNull AppConfiguration configuration; private static Map createKafkaProperties(final KafkaEndpointConfig endpointConfig) { final Map kafkaConfig = new HashMap<>(); @@ -89,7 +89,7 @@ private static Map createKafkaProperties(final KafkaEndpointConf * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) * *
  • - * Configs provided by {@link ProducerAppConfiguration#getKafkaConfig()} + * Configs provided by {@link AppConfiguration#getKafkaConfig()} *
  • *
  • * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index e7bdcff6..0882d169 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -38,14 +38,14 @@ import org.apache.kafka.streams.Topology; /** - * A {@link StreamsApp} with a corresponding {@link StreamsAppConfiguration} + * A {@link StreamsApp} with a corresponding {@link AppConfiguration} * @param type of {@link StreamsApp} */ @RequiredArgsConstructor public class ConfiguredStreamsApp implements ConfiguredApp> { @Getter private final @NonNull T app; - private final @NonNull StreamsAppConfiguration configuration; + private final @NonNull AppConfiguration configuration; private static Map createKafkaProperties(final KafkaEndpointConfig endpointConfig) { final Map kafkaConfig = new HashMap<>(); @@ -96,7 +96,7 @@ private static Map createKafkaProperties(final KafkaEndpointConf * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) *
  • *
  • - * Configs provided by {@link StreamsAppConfiguration#getKafkaConfig()} + * Configs provided by {@link AppConfiguration#getKafkaConfig()} *
  • *
  • * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} @@ -145,10 +145,8 @@ public StreamsTopicConfig getTopics() { public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) { final Map kafkaProperties = this.getKafkaProperties(endpointConfig); final Topology topology = this.createTopology(kafkaProperties); - final EffectiveStreamsAppConfiguration effectiveConfiguration = EffectiveStreamsAppConfiguration.builder() - .kafkaProperties(kafkaProperties) - .topics(this.getTopics()) - .build(); + final EffectiveAppConfiguration effectiveConfiguration = + new EffectiveAppConfiguration<>(this.getTopics(), kafkaProperties); return ExecutableStreamsApp.builder() .topology(topology) .config(new StreamsConfig(kafkaProperties)) diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveStreamsAppConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveAppConfiguration.java similarity index 76% rename from streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveStreamsAppConfiguration.java rename to streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveAppConfiguration.java index 5ae8e96c..ef887446 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveStreamsAppConfiguration.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveAppConfiguration.java @@ -24,28 +24,26 @@ package com.bakdata.kafka; -import static java.util.Collections.emptyMap; - import com.bakdata.kafka.util.ImprovedAdminClient; import java.util.Map; -import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.NonNull; import lombok.Value; /** - * Configuration for setting up a {@link StreamsApp} - * @see StreamsApp#setup(EffectiveStreamsAppConfiguration) - * @see StreamsApp#setupCleanUp(EffectiveStreamsAppConfiguration) + * Configuration for setting up an app + * @see StreamsApp#setup(EffectiveAppConfiguration) + * @see StreamsApp#setupCleanUp(EffectiveAppConfiguration) + * @see ProducerApp#setup(EffectiveAppConfiguration) + * @see ProducerApp#setupCleanUp(EffectiveAppConfiguration) */ -@Builder @Value @EqualsAndHashCode -public class EffectiveStreamsAppConfiguration { - @Builder.Default - @NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build(); - @Builder.Default - @NonNull Map kafkaProperties = emptyMap(); +public class EffectiveAppConfiguration { + @NonNull + T topics; + @NonNull + Map kafkaProperties; /** * Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveProducerAppConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveProducerAppConfiguration.java deleted file mode 100644 index 1f4c600e..00000000 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/EffectiveProducerAppConfiguration.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import static java.util.Collections.emptyMap; - -import com.bakdata.kafka.util.ImprovedAdminClient; -import java.util.Map; -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.NonNull; -import lombok.Value; - -/** - * Configuration for setting up a {@link ProducerApp} - * @see ProducerApp#setup(EffectiveProducerAppConfiguration) - * @see ProducerApp#setupCleanUp(EffectiveProducerAppConfiguration) - */ -@Builder -@Value -@EqualsAndHashCode -public class EffectiveProducerAppConfiguration { - @Builder.Default - @NonNull ProducerTopicConfig topics = ProducerTopicConfig.builder().build(); - @Builder.Default - @NonNull Map kafkaProperties = emptyMap(); - - /** - * Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties} - * @return {@code ImprovedAdminClient} - */ - public ImprovedAdminClient createAdminClient() { - return ImprovedAdminClient.create(this.kafkaProperties); - } -} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java index 22a5cd65..85cd7249 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java @@ -48,7 +48,7 @@ public class ExecutableProducerApp */ @Override public ProducerCleanUpRunner createCleanUpRunner() { - final EffectiveProducerAppConfiguration configuration = this.createEffectiveConfiguration(); + final EffectiveAppConfiguration configuration = this.createEffectiveConfiguration(); final ProducerCleanUpConfiguration configurer = this.app.setupCleanUp(configuration); return ProducerCleanUpRunner.create(this.topics, this.kafkaProperties, configurer); } @@ -68,7 +68,7 @@ public ProducerRunner createRunner(final ProducerExecutionOptions options) { .topics(this.topics) .kafkaProperties(this.kafkaProperties) .build(); - final EffectiveProducerAppConfiguration configuration = this.createEffectiveConfiguration(); + final EffectiveAppConfiguration configuration = this.createEffectiveConfiguration(); this.app.setup(configuration); return new ProducerRunner(this.app.buildRunnable(producerBuilder)); } @@ -78,10 +78,7 @@ public void close() { this.app.close(); } - private EffectiveProducerAppConfiguration createEffectiveConfiguration() { - return EffectiveProducerAppConfiguration.builder() - .topics(this.topics) - .kafkaProperties(this.kafkaProperties) - .build(); + private EffectiveAppConfiguration createEffectiveConfiguration() { + return new EffectiveAppConfiguration<>(this.topics, this.kafkaProperties); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java index 9b59604b..04c277f6 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -45,7 +45,7 @@ public interface ProducerApp extends AutoCloseable { * Setup Kafka resources, such as topics, before running this app * @param configuration provides all runtime application configurations */ - default void setup(final EffectiveProducerAppConfiguration configuration) { + default void setup(final EffectiveAppConfiguration configuration) { // do nothing by default } @@ -63,7 +63,8 @@ default Map createKafkaProperties() { * @return {@code ProducerCleanUpConfiguration} * @see ProducerCleanUpRunner */ - default ProducerCleanUpConfiguration setupCleanUp(final EffectiveProducerAppConfiguration configuration) { + default ProducerCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { return new ProducerCleanUpConfiguration(); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppConfiguration.java deleted file mode 100644 index ba7a5a86..00000000 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppConfiguration.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import static java.util.Collections.emptyMap; - -import java.util.Map; -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.NonNull; -import lombok.Value; - -/** - * Configuration of {@link ProducerApp}. This includes {@link ProducerTopicConfig} and Kafka configuration - */ -@Builder -@Value -@EqualsAndHashCode -public class ProducerAppConfiguration { - @Builder.Default - @NonNull ProducerTopicConfig topics = ProducerTopicConfig.builder().build(); - @Builder.Default - @NonNull Map kafkaConfig = emptyMap(); - -} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java index 250443fe..f53e7fdf 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java @@ -82,13 +82,10 @@ public Configurator createConfigurator() { } /** - * Create {@code EffectiveProducerAppConfiguration} used by this app - * @return {@code EffectiveProducerAppConfiguration} + * Create {@code EffectiveAppConfiguration} used by this app + * @return {@code EffectiveAppConfiguration} */ - public EffectiveProducerAppConfiguration createEffectiveConfiguration() { - return EffectiveProducerAppConfiguration.builder() - .kafkaProperties(this.kafkaProperties) - .topics(this.topics) - .build(); + public EffectiveAppConfiguration createEffectiveConfiguration() { + return new EffectiveAppConfiguration<>(this.topics, this.kafkaProperties); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerRunner.java index 5ea75176..2c55a987 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerRunner.java @@ -26,22 +26,26 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; /** * Runs a Kafka Producer application */ @RequiredArgsConstructor +@Slf4j public class ProducerRunner implements Runner { private final @NonNull ProducerRunnable runnable; @Override public void close() { + log.info("Closing producer"); this.runnable.close(); } @Override public void run() { + log.info("Starting producer"); this.runnable.run(); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java index bdbb19fc..3afc15fc 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -54,7 +54,7 @@ public interface StreamsApp extends AutoCloseable { * Setup Kafka resources, such as topics, before running this app * @param configuration provides all runtime application configurations */ - default void setup(final EffectiveStreamsAppConfiguration configuration) { + default void setup(final EffectiveAppConfiguration configuration) { // do nothing by default } @@ -72,7 +72,8 @@ default Map createKafkaProperties() { * @return {@code StreamsCleanUpConfiguration} * @see StreamsCleanUpRunner */ - default StreamsCleanUpConfiguration setupCleanUp(final EffectiveStreamsAppConfiguration configuration) { + default StreamsCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { return new StreamsCleanUpConfiguration(); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java index bf2dc156..56774ab3 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java @@ -147,14 +147,11 @@ public Configurator createConfigurator() { } /** - * Create {@code EffectiveStreamsAppConfiguration} used by this app - * @return {@code EffectiveStreamsAppConfiguration} + * Create {@code EffectiveAppConfiguration} used by this app + * @return {@code EffectiveAppConfiguration} */ - public EffectiveStreamsAppConfiguration createEffectiveConfiguration() { - return EffectiveStreamsAppConfiguration.builder() - .kafkaProperties(this.kafkaProperties) - .topics(this.topics) - .build(); + public EffectiveAppConfiguration createEffectiveConfiguration() { + return new EffectiveAppConfiguration<>(this.topics, this.kafkaProperties); } Topology build() { diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 61b7acae..d41ccd0e 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -38,12 +38,10 @@ class AvroMirrorTest { StreamsBootstrapTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app); private static ConfiguredStreamsApp createApp() { - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .topics(StreamsTopicConfig.builder() + final AppConfiguration configuration = new AppConfiguration<>(StreamsTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") - .build()) - .build(); + .build()); return new ConfiguredStreamsApp<>(new MirrorWithNonDefaultSerde(), configuration); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index b17492a7..57dbccb3 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -36,14 +36,20 @@ class ConfiguredProducerAppTest { + private static AppConfiguration newAppConfiguration() { + return new AppConfiguration<>(emptyTopicConfig()); + } + + private static ProducerTopicConfig emptyTopicConfig() { + return ProducerTopicConfig.builder().build(); + } + @Test void shouldPrioritizeConfigCLIParameters() { - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .kafkaConfig(Map.of( - "foo", "baz", - "kafka", "streams" - )) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + "foo", "baz", + "kafka", "streams" + )); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() @@ -58,8 +64,7 @@ void shouldPrioritizeConfigCLIParameters() { @SetEnvironmentVariable(key = "KAFKA_FOO", value = "baz") @SetEnvironmentVariable(key = "KAFKA_KAFKA", value = "streams") void shouldPrioritizeEnvironmentConfigs() { - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .build(); + final AppConfiguration configuration = newAppConfiguration(); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() @@ -72,8 +77,7 @@ void shouldPrioritizeEnvironmentConfigs() { @Test void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .build(); + final AppConfiguration configuration = newAppConfiguration(); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() @@ -86,8 +90,7 @@ void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { @Test void shouldSetDefaultStringSerializerWhenSchemaRegistryUrlIsNotSet() { - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .build(); + final AppConfiguration configuration = newAppConfiguration(); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index 257dfc05..b1b3c648 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -37,14 +37,20 @@ class ConfiguredStreamsAppTest { + private static StreamsTopicConfig emptyTopicConfig() { + return StreamsTopicConfig.builder().build(); + } + + private static AppConfiguration newAppConfiguration() { + return new AppConfiguration<>(emptyTopicConfig()); + } + @Test void shouldPrioritizeConfigCLIParameters() { - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .kafkaConfig(Map.of( - "foo", "baz", - "kafka", "streams" - )) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + "foo", "baz", + "kafka", "streams" + )); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() @@ -59,8 +65,7 @@ void shouldPrioritizeConfigCLIParameters() { @SetEnvironmentVariable(key = "KAFKA_FOO", value = "baz") @SetEnvironmentVariable(key = "KAFKA_KAFKA", value = "streams") void shouldPrioritizeEnvironmentConfigs() { - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .build(); + final AppConfiguration configuration = newAppConfiguration(); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() @@ -73,8 +78,7 @@ void shouldPrioritizeEnvironmentConfigs() { @Test void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .build(); + final AppConfiguration configuration = newAppConfiguration(); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() @@ -88,8 +92,7 @@ void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { @Test void shouldSetDefaultStringSerdeWhenSchemaRegistryUrlIsNotSet() { - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .build(); + final AppConfiguration configuration = newAppConfiguration(); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java index 98124429..03fbe575 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java @@ -42,7 +42,7 @@ class ExecutableProducerAppTest { @Mock - private Consumer setup; + private Consumer> setup; @Mock private Supplier setupCleanUp; @@ -51,9 +51,7 @@ void shouldCallSetupWhenCreatingRunner() { final ProducerTopicConfig topics = ProducerTopicConfig.builder() .outputTopic("output") .build(); - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() @@ -62,10 +60,7 @@ void shouldCallSetupWhenCreatingRunner() { final ExecutableProducerApp executableApp = configuredApp.withEndpoint(endpointConfig); final Map kafkaProperties = configuredApp.getKafkaProperties(endpointConfig); executableApp.createRunner(); - verify(this.setup).accept(EffectiveProducerAppConfiguration.builder() - .kafkaProperties(kafkaProperties) - .topics(topics) - .build()); + verify(this.setup).accept(new EffectiveAppConfiguration<>(topics, kafkaProperties)); } @Test @@ -73,9 +68,7 @@ void shouldCallSetupWhenCreatingRunnerWithOptions() { final ProducerTopicConfig topics = ProducerTopicConfig.builder() .outputTopic("output") .build(); - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() @@ -84,10 +77,7 @@ void shouldCallSetupWhenCreatingRunnerWithOptions() { final ExecutableProducerApp executableApp = configuredApp.withEndpoint(endpointConfig); final Map kafkaProperties = configuredApp.getKafkaProperties(endpointConfig); executableApp.createRunner(ProducerExecutionOptions.builder().build()); - verify(this.setup).accept(EffectiveProducerAppConfiguration.builder() - .kafkaProperties(kafkaProperties) - .topics(topics) - .build()); + verify(this.setup).accept(new EffectiveAppConfiguration<>(topics, kafkaProperties)); } @Test @@ -95,9 +85,7 @@ void shouldCallSetupCleanUpWhenCreatingCleanUpRunner() { final ProducerTopicConfig topics = ProducerTopicConfig.builder() .outputTopic("output") .build(); - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() @@ -112,12 +100,13 @@ void shouldCallSetupCleanUpWhenCreatingCleanUpRunner() { private class TestProducer implements ProducerApp { @Override - public void setup(final EffectiveProducerAppConfiguration configuration) { + public void setup(final EffectiveAppConfiguration configuration) { ExecutableProducerAppTest.this.setup.accept(configuration); } @Override - public ProducerCleanUpConfiguration setupCleanUp(final EffectiveProducerAppConfiguration configuration) { + public ProducerCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { return ExecutableProducerAppTest.this.setupCleanUp.get(); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java index f13d199a..c1ff3fbf 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java @@ -43,7 +43,7 @@ class ExecutableStreamsAppTest { @Mock - private Consumer setup; + private Consumer> setup; @Mock private Supplier setupCleanUp; @@ -53,9 +53,7 @@ void shouldCallSetupWhenCreatingRunner() { .inputTopics(List.of("input")) .outputTopic("output") .build(); - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() @@ -64,10 +62,7 @@ void shouldCallSetupWhenCreatingRunner() { final ExecutableStreamsApp executableApp = configuredApp.withEndpoint(endpointConfig); final Map kafkaProperties = configuredApp.getKafkaProperties(endpointConfig); executableApp.createRunner(); - verify(this.setup).accept(EffectiveStreamsAppConfiguration.builder() - .kafkaProperties(kafkaProperties) - .topics(topics) - .build()); + verify(this.setup).accept(new EffectiveAppConfiguration<>(topics, kafkaProperties)); } @Test @@ -76,9 +71,7 @@ void shouldCallSetupWhenCreatingRunnerWithOptions() { .inputTopics(List.of("input")) .outputTopic("output") .build(); - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() @@ -87,10 +80,7 @@ void shouldCallSetupWhenCreatingRunnerWithOptions() { final ExecutableStreamsApp executableApp = configuredApp.withEndpoint(endpointConfig); final Map kafkaProperties = configuredApp.getKafkaProperties(endpointConfig); executableApp.createRunner(StreamsExecutionOptions.builder().build()); - verify(this.setup).accept(EffectiveStreamsAppConfiguration.builder() - .kafkaProperties(kafkaProperties) - .topics(topics) - .build()); + verify(this.setup).accept(new EffectiveAppConfiguration<>(topics, kafkaProperties)); } @Test @@ -99,9 +89,7 @@ void shouldCallSetupCleanUpWhenCreatingCleanUpRunner() { .inputTopics(List.of("input")) .outputTopic("output") .build(); - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() @@ -116,12 +104,13 @@ void shouldCallSetupCleanUpWhenCreatingCleanUpRunner() { private class TestApplication implements StreamsApp { @Override - public void setup(final EffectiveStreamsAppConfiguration configuration) { + public void setup(final EffectiveAppConfiguration configuration) { ExecutableStreamsAppTest.this.setup.accept(configuration); } @Override - public StreamsCleanUpConfiguration setupCleanUp(final EffectiveStreamsAppConfiguration setupConfiguration) { + public StreamsCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration setupConfiguration) { return ExecutableStreamsAppTest.this.setupCleanUp.get(); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 01b2acb4..05f3381c 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -31,7 +31,7 @@ import com.bakdata.kafka.CleanUpRunner; import com.bakdata.kafka.ConfiguredProducerApp; -import com.bakdata.kafka.EffectiveProducerAppConfiguration; +import com.bakdata.kafka.EffectiveAppConfiguration; import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableProducerApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; @@ -157,7 +157,8 @@ void shouldCallCleanUpHookForAllTopics() { private ConfiguredProducerApp createCleanUpHookApplication() { return configureApp(new StringProducer() { @Override - public ProducerCleanUpConfiguration setupCleanUp(final EffectiveProducerAppConfiguration configuration) { + public ProducerCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { return super.setupCleanUp(configuration) .registerTopicHook(ProducerCleanUpRunnerTest.this.topicHook); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java index 3e0dfd91..1c0b23bd 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java @@ -26,9 +26,9 @@ import static com.bakdata.kafka.integration.ProducerCleanUpRunnerTest.createStringApplication; +import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredProducerApp; import com.bakdata.kafka.ProducerApp; -import com.bakdata.kafka.ProducerAppConfiguration; import com.bakdata.kafka.ProducerRunner; import com.bakdata.kafka.ProducerTopicConfig; import java.util.List; @@ -51,9 +51,7 @@ class ProducerRunnerTest extends KafkaTest { private SoftAssertions softly; static ConfiguredProducerApp configureApp(final ProducerApp app, final ProducerTopicConfig topics) { - final ProducerAppConfiguration configuration = ProducerAppConfiguration.builder() - .topics(topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(topics); return new ConfiguredProducerApp<>(app, configuration); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index ee9c9121..a80ae6cc 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -33,7 +33,7 @@ import com.bakdata.kafka.CleanUpException; import com.bakdata.kafka.CleanUpRunner; import com.bakdata.kafka.ConfiguredStreamsApp; -import com.bakdata.kafka.EffectiveStreamsAppConfiguration; +import com.bakdata.kafka.EffectiveAppConfiguration; import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableStreamsApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; @@ -604,7 +604,8 @@ private ConfiguredStreamsApp createComplexCleanUpHookApplication() { this.kafkaCluster.createTopic(TopicConfig.withName(ComplexTopologyApplication.THROUGH_TOPIC).useDefaults()); return configureApp(new ComplexTopologyApplication() { @Override - public StreamsCleanUpConfiguration setupCleanUp(final EffectiveStreamsAppConfiguration configuration) { + public StreamsCleanUpConfiguration setupCleanUp( + final EffectiveAppConfiguration configuration) { return super.setupCleanUp(configuration) .registerTopicHook(StreamsCleanUpRunnerTest.this.topicHook); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 0d178c03..03a5f6dd 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -29,9 +29,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; import com.bakdata.kafka.StreamsApp; -import com.bakdata.kafka.StreamsAppConfiguration; import com.bakdata.kafka.StreamsExecutionOptions; import com.bakdata.kafka.StreamsRunner; import com.bakdata.kafka.StreamsTopicConfig; @@ -91,13 +91,10 @@ static Thread run(final StreamsRunner runner) { } static ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .kafkaConfig(Map.of( + final AppConfiguration configuration = new AppConfiguration<>(topics, Map.of( StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" - )) - .topics(topics) - .build(); + )); return new ConfiguredStreamsApp<>(app, configuration); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java index 5f5104d7..9951285d 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java @@ -26,10 +26,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; import com.bakdata.kafka.KafkaEndpointConfig; import com.bakdata.kafka.StreamsApp; -import com.bakdata.kafka.StreamsAppConfiguration; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.test_applications.ComplexTopologyApplication; import java.util.List; @@ -57,9 +57,7 @@ void setup() { .inputTopics(List.of("input", "input2")) .outputTopic("output") .build(); - final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder() - .topics(this.topics) - .build(); + final AppConfiguration configuration = new AppConfiguration<>(this.topics); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(this.app, configuration); final Map kafkaProperties = configuredApp.getKafkaProperties( KafkaEndpointConfig.builder()