From 33d841ac8b956630cf930e470efba83ab08806b5 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 4 Apr 2024 14:50:10 +0200 Subject: [PATCH] Create v3 --- .../bakdata/kafka/ConfiguredStreamsApp.java | 15 +++++- .../bakdata/kafka/ExecutableProducerApp.java | 9 ++++ .../bakdata/kafka/ExecutableStreamsApp.java | 11 +++- .../java/com/bakdata/kafka/ProducerApp.java | 4 ++ .../kafka/ProducerAppSetupConfiguration.java | 50 +++++++++++++++++++ .../com/bakdata/kafka/ProducerBuilder.java | 9 ---- .../java/com/bakdata/kafka/StreamsApp.java | 4 ++ .../kafka/StreamsAppSetupConfiguration.java | 50 +++++++++++++++++++ .../com/bakdata/kafka/TopologyBuilder.java | 9 ---- 9 files changed, 140 insertions(+), 21 deletions(-) create mode 100644 streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppSetupConfiguration.java create mode 100644 streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppSetupConfiguration.java diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 21fc57e2..3fd0683e 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -116,7 +116,12 @@ public StreamsTopicConfig getTopics() { public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) { final Map kafkaProperties = this.getKafkaProperties(endpointConfig); final Topology topology = this.createTopology(kafkaProperties); - return new ExecutableStreamsApp<>(topology, new StreamsConfig(kafkaProperties), this.app); + return ExecutableStreamsApp.builder() + .topology(topology) + .streamsConfig(new StreamsConfig(kafkaProperties)) + .app(this.app) + .setup(() -> this.setupApp(kafkaProperties)) + .build(); } /** @@ -139,4 +144,12 @@ public void close() { this.app.close(); } + private void setupApp(final Map kafkaProperties) { + final StreamsAppSetupConfiguration configuration = StreamsAppSetupConfiguration.builder() + .kafkaProperties(kafkaProperties) + .topics(this.getTopics()) + .build(); + this.app.setup(configuration); + } + } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java index f38bf959..af8e5325 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java @@ -59,6 +59,7 @@ public ProducerRunner createRunner() { .topics(this.topics) .kafkaProperties(this.kafkaProperties) .build(); + this.setup(); return new ProducerRunner(() -> this.app.run(producerBuilder)); } @@ -66,4 +67,12 @@ public ProducerRunner createRunner() { public void close() { this.app.close(); } + + private void setup() { + final ProducerAppSetupConfiguration configuration = ProducerAppSetupConfiguration.builder() + .topics(this.topics) + .kafkaProperties(this.kafkaProperties) + .build(); + this.app.setup(configuration); + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java index 405a3edb..149b3537 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java @@ -24,9 +24,10 @@ package com.bakdata.kafka; +import lombok.AccessLevel; +import lombok.Builder; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -34,13 +35,18 @@ * A {@link StreamsApp} with a corresponding {@link Topology} and {@link StreamsConfig} * @param type of {@link ProducerApp} */ -@RequiredArgsConstructor +@Builder(access = AccessLevel.PACKAGE) @Getter public class ExecutableStreamsApp implements AutoCloseable { + @Getter private final @NonNull Topology topology; + @Getter private final @NonNull StreamsConfig streamsConfig; + @Getter private final @NonNull T app; + @Builder.Default + private final @NonNull Runnable setup = () -> {}; /** * Create {@code StreamsCleanUpRunner} in order to clean application @@ -67,6 +73,7 @@ public StreamsRunner createRunner() { * @see StreamsRunner#StreamsRunner(Topology, StreamsConfig, StreamsExecutionOptions) */ public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) { + this.setup.run(); return new StreamsRunner(this.topology, this.streamsConfig, executionOptions); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java index 46c2b06b..8fb52d0a 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -34,6 +34,10 @@ @FunctionalInterface public interface ProducerApp extends AutoCloseable { + default void setup(final ProducerAppSetupConfiguration configuration) { + // do nothing by default + } + /** * Called when running the producer * @param builder provides all runtime application configurations diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppSetupConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppSetupConfiguration.java new file mode 100644 index 00000000..cf323a5f --- /dev/null +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerAppSetupConfiguration.java @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.emptyMap; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import java.util.Map; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +@Builder +@Value +public class ProducerAppSetupConfiguration { + @Builder.Default + @NonNull ProducerTopicConfig topics = ProducerTopicConfig.builder().build(); + @Builder.Default + @NonNull Map kafkaProperties = emptyMap(); + + /** + * Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties} + * @return {@code ImprovedAdminClient} + */ + public ImprovedAdminClient createAdminClient() { + return ImprovedAdminClient.create(this.kafkaProperties); + } +} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java index 7432c421..8a020975 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerBuilder.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.bakdata.kafka.util.ImprovedAdminClient; import java.util.Map; import lombok.AccessLevel; import lombok.Builder; @@ -70,12 +69,4 @@ public Producer createProducer(final Serializer keySerializer, final Serializer valueSerializer) { return new KafkaProducer<>(this.kafkaProperties, keySerializer, valueSerializer); } - - /** - * Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties} - * @return {@code ImprovedAdminClient} - */ - public ImprovedAdminClient createAdminClient() { - return ImprovedAdminClient.create(this.kafkaProperties); - } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java index 4a6336f6..bcf14466 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -35,6 +35,10 @@ public interface StreamsApp extends AutoCloseable { int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3; + default void setup(final StreamsAppSetupConfiguration configuration) { + // do nothing by default + } + /** * Build the Kafka Streams {@link org.apache.kafka.streams.Topology} to be run by the app. * diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppSetupConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppSetupConfiguration.java new file mode 100644 index 00000000..05a18a25 --- /dev/null +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppSetupConfiguration.java @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.emptyMap; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import java.util.Map; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +@Builder +@Value +public class StreamsAppSetupConfiguration { + @Builder.Default + @NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build(); + @Builder.Default + @NonNull Map kafkaProperties = emptyMap(); + + /** + * Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties} + * @return {@code ImprovedAdminClient} + */ + public ImprovedAdminClient createAdminClient() { + return ImprovedAdminClient.create(this.kafkaProperties); + } +} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java index 142a971f..c6372b2e 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.bakdata.kafka.util.ImprovedAdminClient; import java.util.Map; import lombok.AccessLevel; import lombok.Builder; @@ -136,14 +135,6 @@ public KStream streamInputPattern(final String role) { return this.streamsBuilder.stream(this.topics.getInputPattern(role)); } - /** - * Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties} - * @return {@code ImprovedAdminClient} - */ - public ImprovedAdminClient createAdminClient() { - return ImprovedAdminClient.create(this.kafkaProperties); - } - Topology build() { return this.streamsBuilder.build(); }