From 5377422e22a2c98e602880c8506c1a2062df7726 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Jan 2025 09:00:01 +0100 Subject: [PATCH] Add error handling --- .../kafka/ConfiguredRepartitioned.java | 92 +++++++++++++++++++ .../kafka/ImprovedCogroupedStreamImpl.java | 3 +- .../kafka/ImprovedKGroupedStreamImpl.java | 3 + .../com/bakdata/kafka/ImprovedKStream.java | 2 + .../bakdata/kafka/ImprovedKStreamImpl.java | 74 ++++++++++----- .../com/bakdata/kafka/ImprovedKTableImpl.java | 75 ++++++++++----- .../com/bakdata/kafka/StreamsContext.java | 54 +++++++++++ 7 files changed, 256 insertions(+), 47 deletions(-) create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredRepartitioned.java diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredRepartitioned.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredRepartitioned.java new file mode 100644 index 00000000..b8738bf2 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredRepartitioned.java @@ -0,0 +1,92 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.With; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.Repartitioned; +import org.apache.kafka.streams.processor.StreamPartitioner; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class ConfiguredRepartitioned { + + @With + private final Preconfigured> keySerde; + @With + private final Preconfigured> valueSerde; + @With + private final StreamPartitioner streamPartitioner; + @With + private final String name; + private final Integer numberOfPartitions; + + public static ConfiguredRepartitioned keySerde(final Preconfigured> keySerde) { + return with(keySerde, null); + } + + public static ConfiguredRepartitioned valueSerde(final Preconfigured> valueSerde) { + return with(null, valueSerde); + } + + public static ConfiguredRepartitioned with(final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return new ConfiguredRepartitioned<>(keySerde, valueSerde, null, null, null); + } + + public static ConfiguredRepartitioned as(final String name) { + return new ConfiguredRepartitioned<>(null, null, null, name, null); + } + + public static ConfiguredRepartitioned numberOfPartitions(final int numberOfPartitions) { + return new ConfiguredRepartitioned<>(null, null, null, null, numberOfPartitions); + } + + public static ConfiguredRepartitioned streamPartitioner(final StreamPartitioner partitioner) { + return new ConfiguredRepartitioned<>(null, null, partitioner, null, null); + } + + public ConfiguredRepartitioned withNumberOfPartitions(final int numberOfPartitions) { + return new ConfiguredRepartitioned<>(this.keySerde, this.valueSerde, this.streamPartitioner, this.name, + numberOfPartitions); + } + + Repartitioned configure(final Configurator configurator) { + return Repartitioned.as(this.name) + .withKeySerde(this.configureKeySerde(configurator)) + .withValueSerde(this.configuredValueSerde(configurator)) + .withStreamPartitioner(this.streamPartitioner) + .withNumberOfPartitions(this.numberOfPartitions); + } + + private Serde configuredValueSerde(final Configurator configurator) { + return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde); + } + + private Serde configureKeySerde(final Configurator configurator) { + return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java index 3e5d9db7..4471783b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java @@ -48,7 +48,8 @@ class ImprovedCogroupedStreamImpl implements ImprovedCogroupedKStream ImprovedCogroupedKStream cogroup(final KGroupedStream groupedStream, final Aggregator aggregator) { - return this.context.wrap(this.wrapped.cogroup(groupedStream, aggregator)); + final KGroupedStream other = StreamsContext.maybeUnwrap(groupedStream); + return this.context.wrap(this.wrapped.cogroup(other, aggregator)); } @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java index 743c06f9..f9a8d01f 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import lombok.AccessLevel; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.utils.Bytes; @@ -42,6 +44,7 @@ @RequiredArgsConstructor class ImprovedKGroupedStreamImpl implements ImprovedKGroupedStream { + @Getter(AccessLevel.PACKAGE) private final @NonNull KGroupedStream wrapped; private final @NonNull StreamsContext context; diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java index 0f8f47c9..971ed76f 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java @@ -234,6 +234,8 @@ KErrorStream flatMapValuesCapturingErrors( @Override ImprovedKStream repartition(Repartitioned repartitioned); + ImprovedKStream repartition(ConfiguredRepartitioned repartitioned); + void toOutputTopic(); void toOutputTopic(Produced produced); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java index af5b9388..2e42f444 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java @@ -25,6 +25,8 @@ package com.bakdata.kafka; import java.util.Arrays; +import lombok.AccessLevel; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.utils.Bytes; @@ -59,6 +61,7 @@ @RequiredArgsConstructor class ImprovedKStreamImpl implements ImprovedKStream { + @Getter(AccessLevel.PACKAGE) private final @NonNull KStream wrapped; private final @NonNull StreamsContext context; @@ -385,12 +388,14 @@ public ImprovedBranchedKStream split(final Named named) { @Override public ImprovedKStream merge(final KStream stream) { - return this.context.wrap(this.wrapped.merge(stream)); + final KStream other = StreamsContext.maybeUnwrap(stream); + return this.context.wrap(this.wrapped.merge(other)); } @Override public ImprovedKStream merge(final KStream stream, final Named named) { - return this.context.wrap(this.wrapped.merge(stream, named)); + final KStream other = StreamsContext.maybeUnwrap(stream); + return this.context.wrap(this.wrapped.merge(other, named)); } @Override @@ -413,6 +418,11 @@ public ImprovedKStream repartition(final Repartitioned repartitioned return this.context.wrap(this.wrapped.repartition(repartitioned)); } + @Override + public ImprovedKStream repartition(final ConfiguredRepartitioned repartitioned) { + return this.repartition(repartitioned.configure(this.context.getConfigurator())); + } + @Override public void to(final String topic) { this.wrapped.to(topic); @@ -523,132 +533,152 @@ public ImprovedKGroupedStream groupByKey(final Grouped grouped) { @Override public ImprovedKStream join(final KStream otherStream, final ValueJoiner joiner, final JoinWindows windows) { - return this.context.wrap(this.wrapped.join(otherStream, joiner, windows)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.join(other, joiner, windows)); } @Override public ImprovedKStream join(final KStream otherStream, final ValueJoinerWithKey joiner, final JoinWindows windows) { - return this.context.wrap(this.wrapped.join(otherStream, joiner, windows)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.join(other, joiner, windows)); } @Override public ImprovedKStream join(final KStream otherStream, final ValueJoiner joiner, final JoinWindows windows, final StreamJoined streamJoined) { - return this.context.wrap(this.wrapped.join(otherStream, joiner, windows, streamJoined)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.join(other, joiner, windows, streamJoined)); } @Override public ImprovedKStream join(final KStream otherStream, final ValueJoinerWithKey joiner, final JoinWindows windows, final StreamJoined streamJoined) { - return this.context.wrap(this.wrapped.join(otherStream, joiner, windows, streamJoined)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.join(other, joiner, windows, streamJoined)); } @Override public ImprovedKStream leftJoin(final KStream otherStream, final ValueJoiner joiner, final JoinWindows windows) { - return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows)); } @Override public ImprovedKStream leftJoin(final KStream otherStream, final ValueJoinerWithKey joiner, final JoinWindows windows) { - return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows)); } @Override public ImprovedKStream leftJoin(final KStream otherStream, final ValueJoiner joiner, final JoinWindows windows, final StreamJoined streamJoined) { - return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows, streamJoined)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows, streamJoined)); } @Override public ImprovedKStream leftJoin(final KStream otherStream, final ValueJoinerWithKey joiner, final JoinWindows windows, final StreamJoined streamJoined) { - return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows, streamJoined)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows, streamJoined)); } @Override public ImprovedKStream outerJoin(final KStream otherStream, final ValueJoiner joiner, final JoinWindows windows) { - return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows)); } @Override public ImprovedKStream outerJoin(final KStream otherStream, final ValueJoinerWithKey joiner, final JoinWindows windows) { - return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows)); } @Override public ImprovedKStream outerJoin(final KStream otherStream, final ValueJoiner joiner, final JoinWindows windows, final StreamJoined streamJoined) { - return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows, streamJoined)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows, streamJoined)); } @Override public ImprovedKStream outerJoin(final KStream otherStream, final ValueJoinerWithKey joiner, final JoinWindows windows, final StreamJoined streamJoined) { - return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows, streamJoined)); + final KStream other = StreamsContext.maybeUnwrap(otherStream); + return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows, streamJoined)); } @Override public ImprovedKStream join(final KTable table, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.join(table, joiner)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.join(other, joiner)); } @Override public ImprovedKStream join(final KTable table, final ValueJoinerWithKey joiner) { - return this.context.wrap(this.wrapped.join(table, joiner)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.join(other, joiner)); } @Override public ImprovedKStream join(final KTable table, final ValueJoiner joiner, final Joined joined) { - return this.context.wrap(this.wrapped.join(table, joiner, joined)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.join(other, joiner, joined)); } @Override public ImprovedKStream join(final KTable table, final ValueJoinerWithKey joiner, final Joined joined) { - return this.context.wrap(this.wrapped.join(table, joiner, joined)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.join(other, joiner, joined)); } @Override public ImprovedKStream leftJoin(final KTable table, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.leftJoin(table, joiner)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.leftJoin(other, joiner)); } @Override public ImprovedKStream leftJoin(final KTable table, final ValueJoinerWithKey joiner) { - return this.context.wrap(this.wrapped.leftJoin(table, joiner)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.leftJoin(other, joiner)); } @Override public ImprovedKStream leftJoin(final KTable table, final ValueJoiner joiner, final Joined joined) { - return this.context.wrap(this.wrapped.leftJoin(table, joiner, joined)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.leftJoin(other, joiner, joined)); } @Override public ImprovedKStream leftJoin(final KTable table, final ValueJoinerWithKey joiner, final Joined joined) { - return this.context.wrap(this.wrapped.leftJoin(table, joiner, joined)); + final KTable other = StreamsContext.maybeUnwrap(table); + return this.context.wrap(this.wrapped.leftJoin(other, joiner, joined)); } @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java index dff295f8..931f1911 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java @@ -25,6 +25,8 @@ package com.bakdata.kafka; import java.util.function.Function; +import lombok.AccessLevel; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.utils.Bytes; @@ -46,6 +48,7 @@ @RequiredArgsConstructor class ImprovedKTableImpl implements ImprovedKTable { + @Getter(AccessLevel.PROTECTED) private final @NonNull KTable wrapped; private final @NonNull StreamsContext context; @@ -210,107 +213,123 @@ public ImprovedKGroupedTable groupBy( @Override public ImprovedKTable join(final KTable other, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.join(other, joiner)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, joiner)); } @Override public ImprovedKTable join(final KTable other, final ValueJoiner joiner, final Named named) { - return this.context.wrap(this.wrapped.join(other, joiner, named)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, joiner, named)); } @Override public ImprovedKTable join(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { - return this.context.wrap(this.wrapped.join(other, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, joiner, materialized)); } @Override public ImprovedKTable join(final KTable other, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return this.context.wrap(this.wrapped.join(other, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, joiner, materialized)); } @Override public ImprovedKTable leftJoin(final KTable other, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.leftJoin(other, joiner)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, joiner)); } @Override public ImprovedKTable leftJoin(final KTable other, final ValueJoiner joiner, final Named named) { - return this.context.wrap(this.wrapped.leftJoin(other, joiner, named)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, joiner, named)); } @Override public ImprovedKTable leftJoin(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { - return this.context.wrap(this.wrapped.leftJoin(other, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, joiner, materialized)); } @Override public ImprovedKTable leftJoin(final KTable other, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return this.context.wrap(this.wrapped.leftJoin(other, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, joiner, materialized)); } @Override public ImprovedKTable outerJoin(final KTable other, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.outerJoin(other, joiner)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.outerJoin(otherTable, joiner)); } @Override public ImprovedKTable outerJoin(final KTable other, final ValueJoiner joiner, final Named named) { - return this.context.wrap(this.wrapped.outerJoin(other, joiner, named)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.outerJoin(otherTable, joiner, named)); } @Override public ImprovedKTable outerJoin(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { - return this.context.wrap(this.wrapped.outerJoin(other, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.outerJoin(otherTable, joiner, materialized)); } @Override public ImprovedKTable outerJoin(final KTable other, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return this.context.wrap(this.wrapped.outerJoin(other, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.outerJoin(otherTable, joiner, materialized)); } @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.join(other, foreignKeyExtractor, joiner)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner)); } @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named) { - return this.context.wrap(this.wrapped.join(other, foreignKeyExtractor, joiner, named)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, named)); } @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final TableJoined tableJoined) { - return this.context.wrap(this.wrapped.join(other, foreignKeyExtractor, joiner, tableJoined)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined)); } @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Materialized> materialized) { - return this.context.wrap(this.wrapped.join(other, foreignKeyExtractor, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, materialized)); } @Override @@ -318,7 +337,8 @@ public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return this.context.wrap(this.wrapped.join(other, foreignKeyExtractor, joiner, named, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, named, materialized)); } @Override @@ -326,35 +346,40 @@ public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { - return this.context.wrap(this.wrapped.join(other, foreignKeyExtractor, joiner, tableJoined, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); } @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner) { - return this.context.wrap(this.wrapped.leftJoin(other, foreignKeyExtractor, joiner)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner)); } @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named) { - return this.context.wrap(this.wrapped.leftJoin(other, foreignKeyExtractor, joiner, named)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, named)); } @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final TableJoined tableJoined) { - return this.context.wrap(this.wrapped.leftJoin(other, foreignKeyExtractor, joiner, tableJoined)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined)); } @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Materialized> materialized) { - return this.context.wrap(this.wrapped.leftJoin(other, foreignKeyExtractor, joiner, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, materialized)); } @Override @@ -362,7 +387,8 @@ public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return this.context.wrap(this.wrapped.leftJoin(other, foreignKeyExtractor, joiner, named, materialized)); + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, named, materialized)); } @Override @@ -370,8 +396,9 @@ public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap( - this.wrapped.leftJoin(other, foreignKeyExtractor, joiner, tableJoined, materialized)); + this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); } @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsContext.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsContext.java index d90a00e7..91a74930 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsContext.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsContext.java @@ -48,47 +48,101 @@ public class StreamsContext { @NonNull Configurator configurator; + static KStream maybeUnwrap(final KStream stream) { + if (stream instanceof ImprovedKStreamImpl) { + return ((ImprovedKStreamImpl) stream).getWrapped(); // Kafka Streams internally casts KStream to + // KStreamImpl in some cases + } + return stream; + } + + static KGroupedStream maybeUnwrap(final KGroupedStream stream) { + if (stream instanceof ImprovedKGroupedStream) { + return ((ImprovedKGroupedStreamImpl) stream).getWrapped(); // Kafka Streams internally casts + // KGroupedStream to KGroupedStreamImpl in some cases + } + return stream; + } + + static KTable maybeUnwrap(final KTable table) { + if (table instanceof ImprovedKTableImpl) { + return ((ImprovedKTableImpl) table).getWrapped(); // Kafka Streams internally casts KTable to + // KTableImpl in some cases + } + return table; + } + public ImprovedKStream wrap(final KStream stream) { + if (stream instanceof ImprovedKStream) { + return (ImprovedKStream) stream; + } return new ImprovedKStreamImpl<>(stream, this); } public ImprovedKGroupedStream wrap(final KGroupedStream stream) { + if (stream instanceof ImprovedKGroupedStream) { + return (ImprovedKGroupedStream) stream; + } return new ImprovedKGroupedStreamImpl<>(stream, this); } public ImprovedTimeWindowedKStream wrap( final TimeWindowedKStream stream) { + if (stream instanceof ImprovedTimeWindowedKStream) { + return (ImprovedTimeWindowedKStream) stream; + } return new ImprovedTimeWindowedStreamImpl<>(stream, this); } public ImprovedSessionWindowedKStream wrap( final SessionWindowedKStream stream) { + if (stream instanceof ImprovedSessionWindowedKStream) { + return (ImprovedSessionWindowedKStream) stream; + } return new ImprovedSessionWindowedStreamImpl<>(stream, this); } public ImprovedTimeWindowedCogroupedKStream wrap( final TimeWindowedCogroupedKStream stream) { + if (stream instanceof ImprovedTimeWindowedCogroupedKStream) { + return (ImprovedTimeWindowedCogroupedKStream) stream; + } return new ImprovedTimeWindowedCogroupedStreamImpl<>(stream, this); } public ImprovedSessionWindowedCogroupedKStream wrap( final SessionWindowedCogroupedKStream stream) { + if (stream instanceof ImprovedSessionWindowedCogroupedKStream) { + return (ImprovedSessionWindowedCogroupedKStream) stream; + } return new ImprovedSessionWindowedCogroupedStreamImpl<>(stream, this); } public ImprovedCogroupedKStream wrap(final CogroupedKStream stream) { + if (stream instanceof ImprovedCogroupedKStream) { + return (ImprovedCogroupedKStream) stream; + } return new ImprovedCogroupedStreamImpl<>(stream, this); } public ImprovedBranchedKStream wrap(final BranchedKStream stream) { + if (stream instanceof ImprovedBranchedKStream) { + return (ImprovedBranchedKStream) stream; + } return new ImprovedBranchedKStreamImpl<>(stream, this); } public ImprovedKTable wrap(final KTable table) { + if (table instanceof ImprovedKTable) { + return (ImprovedKTable) table; + } return new ImprovedKTableImpl<>(table, this); } public ImprovedKGroupedTable wrap(final KGroupedTable table) { + if (table instanceof ImprovedKGroupedTable) { + return (ImprovedKGroupedTable) table; + } return new ImprovedKGroupedTableImpl<>(table, this); } }