From a5f272e7cf0e92c3269f1a8b0acd2fb94b3a9329 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Jan 2025 10:48:22 +0100 Subject: [PATCH] Add error handling --- .../bakdata/kafka/ConfiguredMaterialized.java | 109 +++++++++++++ .../kafka/ImprovedCogroupedKStream.java | 6 + .../kafka/ImprovedCogroupedStreamImpl.java | 12 ++ .../bakdata/kafka/ImprovedKGroupedStream.java | 17 ++ .../kafka/ImprovedKGroupedStreamImpl.java | 37 +++++ .../bakdata/kafka/ImprovedKGroupedTable.java | 19 +++ .../kafka/ImprovedKGroupedTableImpl.java | 37 +++++ .../com/bakdata/kafka/ImprovedKStream.java | 4 + .../bakdata/kafka/ImprovedKStreamImpl.java | 11 ++ .../com/bakdata/kafka/ImprovedKTable.java | 77 +++++++++ .../com/bakdata/kafka/ImprovedKTableImpl.java | 147 ++++++++++++++++++ ...provedSessionWindowedCogroupedKStream.java | 6 + ...vedSessionWindowedCogroupedStreamImpl.java | 12 ++ .../kafka/ImprovedSessionWindowedKStream.java | 21 +++ .../ImprovedSessionWindowedStreamImpl.java | 39 +++++ .../ImprovedTimeWindowedCogroupedKStream.java | 6 + ...provedTimeWindowedCogroupedStreamImpl.java | 12 ++ .../kafka/ImprovedTimeWindowedKStream.java | 19 +++ .../kafka/ImprovedTimeWindowedStreamImpl.java | 38 +++++ 19 files changed, 629 insertions(+) create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredMaterialized.java diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredMaterialized.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredMaterialized.java new file mode 100644 index 00000000..b7d24640 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredMaterialized.java @@ -0,0 +1,109 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.With; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.DslStoreSuppliers; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class ConfiguredMaterialized { + + @With + private final Preconfigured> keySerde; + @With + private final Preconfigured> valueSerde; + private final String storeName; + @With + private final Duration retention; + @With + private final DslStoreSuppliers storeType; + private final Map topicConfig; + private final boolean loggingEnabled; + private final boolean cachingEnabled; + + public static ConfiguredMaterialized keySerde( + final Preconfigured> keySerde) { + return with(keySerde, null); + } + + public static ConfiguredMaterialized valueSerde( + final Preconfigured> valueSerde) { + return with(null, valueSerde); + } + + public static ConfiguredMaterialized with( + final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return new ConfiguredMaterialized<>(keySerde, valueSerde, null, null, null, new HashMap<>(), true, true); + } + + public static ConfiguredMaterialized as(final String storeName) { + return new ConfiguredMaterialized<>(null, null, storeName, null, null, new HashMap<>(), true, true); + } + + public static ConfiguredMaterialized as( + final DslStoreSuppliers storeSuppliers) { + return new ConfiguredMaterialized<>(null, null, null, null, storeSuppliers, new HashMap<>(), true, true); + } + + Materialized configure(final Configurator configurator) { + final Materialized materialized = Materialized.as(this.storeName) + .withKeySerde(this.configureKeySerde(configurator)) + .withValueSerde(this.configuredValueSerde(configurator)); + if (this.retention != null) { + materialized.withRetention(this.retention); + } + if (this.storeType != null) { + materialized.withStoreType(this.storeType); + } + if (this.loggingEnabled) { + materialized.withLoggingEnabled(this.topicConfig); + } else { + materialized.withLoggingDisabled(); + } + if (this.cachingEnabled) { + materialized.withCachingEnabled(); + } else { + materialized.withCachingDisabled(); + } + return materialized; + } + + private Serde configuredValueSerde(final Configurator configurator) { + return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde); + } + + private Serde configureKeySerde(final Configurator configurator) { + return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedKStream.java index 89a4e36e..4d36833d 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedKStream.java @@ -53,10 +53,16 @@ ImprovedCogroupedKStream cogroup(KGroupedStream groupedSt ImprovedKTable aggregate(Initializer initializer, Materialized> materialized); + ImprovedKTable aggregate(Initializer initializer, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable aggregate(Initializer initializer, Named named, Materialized> materialized); + ImprovedKTable aggregate(Initializer initializer, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedTimeWindowedCogroupedKStream windowedBy(Windows windows); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java index 4471783b..b2b8cc49 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedCogroupedStreamImpl.java @@ -68,12 +68,24 @@ public ImprovedKTable aggregate(final Initializer initializer, return this.context.wrap(this.wrapped.aggregate(initializer, materialized)); } + @Override + public ImprovedKTable aggregate(final Initializer initializer, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable aggregate(final Initializer initializer, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.aggregate(initializer, named, materialized)); } + @Override + public ImprovedKTable aggregate(final Initializer initializer, final Named named, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedTimeWindowedCogroupedKStream windowedBy(final Windows windows) { return this.context.wrap(this.wrapped.windowedBy(windows)); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStream.java index 3fc76ed8..8c54bae1 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStream.java @@ -48,19 +48,30 @@ public interface ImprovedKGroupedStream extends KGroupedStream { @Override ImprovedKTable count(Materialized> materialized); + ImprovedKTable count(ConfiguredMaterialized> materialized); + @Override ImprovedKTable count(Named named, Materialized> materialized); + ImprovedKTable count(Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable reduce(Reducer reducer); @Override ImprovedKTable reduce(Reducer reducer, Materialized> materialized); + ImprovedKTable reduce(Reducer reducer, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable reduce(Reducer reducer, Named named, Materialized> materialized); + ImprovedKTable reduce(Reducer reducer, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable aggregate(Initializer initializer, Aggregator aggregator); @@ -68,10 +79,16 @@ ImprovedKTable reduce(Reducer reducer, Named named, ImprovedKTable aggregate(Initializer initializer, Aggregator aggregator, Materialized> materialized); + ImprovedKTable aggregate(Initializer initializer, Aggregator aggregator, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable aggregate(Initializer initializer, Aggregator aggregator, Named named, Materialized> materialized); + ImprovedKTable aggregate(Initializer initializer, Aggregator aggregator, + Named named, ConfiguredMaterialized> materialized); + @Override ImprovedTimeWindowedKStream windowedBy(Windows windows); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java index f9a8d01f..27014dc6 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedStreamImpl.java @@ -63,12 +63,23 @@ public ImprovedKTable count(final Materialized count(final ConfiguredMaterialized> materialized) { + return this.count(materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable count(final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.count(named, materialized)); } + @Override + public ImprovedKTable count(final Named named, + final ConfiguredMaterialized> materialized) { + return this.count(named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable reduce(final Reducer reducer) { return this.context.wrap(this.wrapped.reduce(reducer)); @@ -80,12 +91,24 @@ public ImprovedKTable reduce(final Reducer reducer, return this.context.wrap(this.wrapped.reduce(reducer, materialized)); } + @Override + public ImprovedKTable reduce(final Reducer reducer, + final ConfiguredMaterialized> materialized) { + return this.reduce(reducer, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable reduce(final Reducer reducer, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.reduce(reducer, named, materialized)); } + @Override + public ImprovedKTable reduce(final Reducer reducer, final Named named, + final ConfiguredMaterialized> materialized) { + return this.reduce(reducer, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable aggregate(final Initializer initializer, final Aggregator aggregator) { @@ -99,6 +122,13 @@ public ImprovedKTable aggregate(final Initializer initializer, return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, materialized)); } + @Override + public ImprovedKTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, aggregator, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable aggregate(final Initializer initializer, final Aggregator aggregator, final Named named, @@ -106,6 +136,13 @@ public ImprovedKTable aggregate(final Initializer initializer, return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, named, materialized)); } + @Override + public ImprovedKTable aggregate(final Initializer initializer, + final Aggregator aggregator, final Named named, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, aggregator, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedTimeWindowedKStream windowedBy(final Windows windows) { return this.context.wrap(this.wrapped.windowedBy(windows)); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTable.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTable.java index e4d8e23a..65aecfe2 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTable.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTable.java @@ -38,9 +38,14 @@ public interface ImprovedKGroupedTable extends KGroupedTable { @Override ImprovedKTable count(Materialized> materialized); + ImprovedKTable count(ConfiguredMaterialized> materialized); + @Override ImprovedKTable count(Named named, Materialized> materialized); + ImprovedKTable count(Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable count(); @@ -51,10 +56,16 @@ public interface ImprovedKGroupedTable extends KGroupedTable { ImprovedKTable reduce(Reducer adder, Reducer subtractor, Materialized> materialized); + ImprovedKTable reduce(Reducer adder, Reducer subtractor, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable reduce(Reducer adder, Reducer subtractor, Named named, Materialized> materialized); + ImprovedKTable reduce(Reducer adder, Reducer subtractor, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable reduce(Reducer adder, Reducer subtractor); @@ -63,11 +74,19 @@ ImprovedKTable aggregate(Initializer initializer, Aggregator subtractor, Materialized> materialized); + ImprovedKTable aggregate(Initializer initializer, Aggregator adder, + Aggregator subtractor, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable aggregate(Initializer initializer, Aggregator adder, Aggregator subtractor, Named named, Materialized> materialized); + ImprovedKTable aggregate(Initializer initializer, Aggregator adder, + Aggregator subtractor, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable aggregate(Initializer initializer, Aggregator adder, Aggregator subtractor); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTableImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTableImpl.java index a82984f6..76089fe6 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTableImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKGroupedTableImpl.java @@ -46,12 +46,23 @@ public ImprovedKTable count(final Materialized count(final ConfiguredMaterialized> materialized) { + return this.count(materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable count(final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.count(named, materialized)); } + @Override + public ImprovedKTable count(final Named named, + final ConfiguredMaterialized> materialized) { + return this.count(named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable count() { return this.context.wrap(this.wrapped.count()); @@ -68,12 +79,24 @@ public ImprovedKTable reduce(final Reducer adder, final Reducer subt return this.context.wrap(this.wrapped.reduce(adder, subtractor, materialized)); } + @Override + public ImprovedKTable reduce(final Reducer adder, final Reducer subtractor, + final ConfiguredMaterialized> materialized) { + return this.reduce(adder, subtractor, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable reduce(final Reducer adder, final Reducer subtractor, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.reduce(adder, subtractor, materialized)); } + @Override + public ImprovedKTable reduce(final Reducer adder, final Reducer subtractor, final Named named, + final ConfiguredMaterialized> materialized) { + return this.reduce(adder, subtractor, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable reduce(final Reducer adder, final Reducer subtractor) { return this.context.wrap(this.wrapped.reduce(adder, subtractor)); @@ -87,6 +110,13 @@ public ImprovedKTable aggregate(final Initializer initializer, return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, materialized)); } + @Override + public ImprovedKTable aggregate(final Initializer initializer, final Aggregator adder, + final Aggregator subtractor, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, adder, subtractor, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable aggregate(final Initializer initializer, final Aggregator adder, @@ -95,6 +125,13 @@ public ImprovedKTable aggregate(final Initializer initializer, return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, materialized)); } + @Override + public ImprovedKTable aggregate(final Initializer initializer, final Aggregator adder, + final Aggregator subtractor, final Named named, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, adder, subtractor, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable aggregate(final Initializer initializer, final Aggregator adder, diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java index 971ed76f..c9bdb0c0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStream.java @@ -263,9 +263,13 @@ KErrorStream flatMapValuesCapturingErrors( @Override ImprovedKTable toTable(Materialized> materialized); + ImprovedKTable toTable(ConfiguredMaterialized> materialized); + @Override ImprovedKTable toTable(Named named, Materialized> materialized); + ImprovedKTable toTable(Named named, ConfiguredMaterialized> materialized); + @Override ImprovedKGroupedStream groupBy(KeyValueMapper keySelector); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java index 2e42f444..50bc2750 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKStreamImpl.java @@ -503,12 +503,23 @@ public ImprovedKTable toTable(final Materialized toTable(final ConfiguredMaterialized> materialized) { + return this.toTable(materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable toTable(final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.toTable(named, materialized)); } + @Override + public ImprovedKTable toTable(final Named named, + final ConfiguredMaterialized> materialized) { + return this.toTable(named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKGroupedStream groupBy(final KeyValueMapper keySelector) { return this.context.wrap(this.wrapped.groupBy(keySelector)); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTable.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTable.java index 459e1326..583da8b4 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTable.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTable.java @@ -53,10 +53,16 @@ public interface ImprovedKTable extends KTable { ImprovedKTable filter(Predicate predicate, Materialized> materialized); + ImprovedKTable filter(Predicate predicate, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable filter(Predicate predicate, Named named, Materialized> materialized); + ImprovedKTable filter(Predicate predicate, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable filterNot(Predicate predicate); @@ -67,10 +73,16 @@ ImprovedKTable filter(Predicate predicate, Named nam ImprovedKTable filterNot(Predicate predicate, Materialized> materialized); + ImprovedKTable filterNot(Predicate predicate, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable filterNot(Predicate predicate, Named named, Materialized> materialized); + ImprovedKTable filterNot(Predicate predicate, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable mapValues(ValueMapper mapper); @@ -87,18 +99,30 @@ ImprovedKTable filterNot(Predicate predicate, Named ImprovedKTable mapValues(ValueMapper mapper, Materialized> materialized); + ImprovedKTable mapValues(ValueMapper mapper, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable mapValues(ValueMapper mapper, Named named, Materialized> materialized); + ImprovedKTable mapValues(ValueMapper mapper, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable mapValues(ValueMapperWithKey mapper, Materialized> materialized); + ImprovedKTable mapValues(ValueMapperWithKey mapper, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable mapValues(ValueMapperWithKey mapper, Named named, Materialized> materialized); + ImprovedKTable mapValues(ValueMapperWithKey mapper, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKStream toStream(); @@ -129,11 +153,20 @@ ImprovedKTable transformValues( ValueTransformerWithKeySupplier transformerSupplier, Materialized> materialized, String... stateStoreNames); + ImprovedKTable transformValues( + ValueTransformerWithKeySupplier transformerSupplier, + ConfiguredMaterialized> materialized, String... stateStoreNames); + @Override ImprovedKTable transformValues( ValueTransformerWithKeySupplier transformerSupplier, Materialized> materialized, Named named, String... stateStoreNames); + ImprovedKTable transformValues( + ValueTransformerWithKeySupplier transformerSupplier, + ConfiguredMaterialized> materialized, Named named, + String... stateStoreNames); + @Override ImprovedKGroupedTable groupBy(KeyValueMapper> selector); @@ -152,10 +185,16 @@ ImprovedKTable join(KTable other, ValueJoiner ImprovedKTable join(KTable other, ValueJoiner joiner, Materialized> materialized); + ImprovedKTable join(KTable other, ValueJoiner joiner, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable join(KTable other, ValueJoiner joiner, Named named, Materialized> materialized); + ImprovedKTable join(KTable other, ValueJoiner joiner, + Named named, ConfiguredMaterialized> materialized); + @Override ImprovedKTable leftJoin(KTable other, ValueJoiner joiner); @@ -170,11 +209,19 @@ ImprovedKTable leftJoin(KTable other, ValueJoiner joiner, Materialized> materialized); + ImprovedKTable leftJoin(KTable other, + ValueJoiner joiner, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable leftJoin(KTable other, ValueJoiner joiner, Named named, Materialized> materialized); + ImprovedKTable leftJoin(KTable other, + ValueJoiner joiner, + Named named, ConfiguredMaterialized> materialized); + @Override ImprovedKTable outerJoin(KTable other, ValueJoiner joiner); @@ -189,11 +236,19 @@ ImprovedKTable outerJoin(KTable other, ValueJoiner joiner, Materialized> materialized); + ImprovedKTable outerJoin(KTable other, + ValueJoiner joiner, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable outerJoin(KTable other, ValueJoiner joiner, Named named, Materialized> materialized); + ImprovedKTable outerJoin(KTable other, + ValueJoiner joiner, + Named named, ConfiguredMaterialized> materialized); + @Override ImprovedKTable join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner); @@ -210,15 +265,26 @@ ImprovedKTable join(KTable other, Function fo ImprovedKTable join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, Materialized> materialized); + ImprovedKTable join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, ConfiguredMaterialized> materialized); + @Override ImprovedKTable join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, Named named, Materialized> materialized); + ImprovedKTable join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, Materialized> materialized); + ImprovedKTable join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner); @@ -235,12 +301,23 @@ ImprovedKTable leftJoin(KTable other, Function ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, Materialized> materialized); + ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, ConfiguredMaterialized> materialized); + @Override ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, Named named, Materialized> materialized); + ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, Materialized> materialized); + + ImprovedKTable leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, + ConfiguredMaterialized> materialized); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java index 931f1911..f7f21249 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedKTableImpl.java @@ -68,12 +68,24 @@ public ImprovedKTable filter(final Predicate predica return this.context.wrap(this.wrapped.filter(predicate, materialized)); } + @Override + public ImprovedKTable filter(final Predicate predicate, + final ConfiguredMaterialized> materialized) { + return this.filter(predicate, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable filter(final Predicate predicate, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.filter(predicate, named, materialized)); } + @Override + public ImprovedKTable filter(final Predicate predicate, final Named named, + final ConfiguredMaterialized> materialized) { + return this.filter(predicate, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable filterNot(final Predicate predicate) { return this.context.wrap(this.wrapped.filterNot(predicate)); @@ -90,12 +102,24 @@ public ImprovedKTable filterNot(final Predicate pred return this.context.wrap(this.wrapped.filterNot(predicate, materialized)); } + @Override + public ImprovedKTable filterNot(final Predicate predicate, + final ConfiguredMaterialized> materialized) { + return this.filterNot(predicate, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable filterNot(final Predicate predicate, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.filterNot(predicate, materialized)); } + @Override + public ImprovedKTable filterNot(final Predicate predicate, final Named named, + final ConfiguredMaterialized> materialized) { + return this.filterNot(predicate, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable mapValues(final ValueMapper mapper) { return this.context.wrap(this.wrapped.mapValues(mapper)); @@ -123,24 +147,48 @@ public ImprovedKTable mapValues(final ValueMapper ImprovedKTable mapValues(final ValueMapper mapper, + final ConfiguredMaterialized> materialized) { + return this.mapValues(mapper, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable mapValues(final ValueMapper mapper, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.mapValues(mapper, named, materialized)); } + @Override + public ImprovedKTable mapValues(final ValueMapper mapper, final Named named, + final ConfiguredMaterialized> materialized) { + return this.mapValues(mapper, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable mapValues(final ValueMapperWithKey mapper, final Materialized> materialized) { return this.context.wrap(this.wrapped.mapValues(mapper, materialized)); } + @Override + public ImprovedKTable mapValues(final ValueMapperWithKey mapper, + final ConfiguredMaterialized> materialized) { + return this.mapValues(mapper, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable mapValues(final ValueMapperWithKey mapper, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.mapValues(mapper, named, materialized)); } + @Override + public ImprovedKTable mapValues(final ValueMapperWithKey mapper, + final Named named, final ConfiguredMaterialized> materialized) { + return this.mapValues(mapper, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKStream toStream() { return this.context.wrap(this.wrapped.toStream()); @@ -189,6 +237,13 @@ public ImprovedKTable transformValues( return this.context.wrap(this.wrapped.transformValues(transformerSupplier, materialized, stateStoreNames)); } + @Override + public ImprovedKTable transformValues( + final ValueTransformerWithKeySupplier transformerSupplier, + final ConfiguredMaterialized> materialized, final String... stateStoreNames) { + return this.transformValues(transformerSupplier, materialized.configure(this.context.getConfigurator()), stateStoreNames); + } + @Override public ImprovedKTable transformValues( final ValueTransformerWithKeySupplier transformerSupplier, @@ -198,6 +253,15 @@ public ImprovedKTable transformValues( this.wrapped.transformValues(transformerSupplier, materialized, named, stateStoreNames)); } + @Override + public ImprovedKTable transformValues( + final ValueTransformerWithKeySupplier transformerSupplier, + final ConfiguredMaterialized> materialized, final Named named, + final String... stateStoreNames) { + return this.transformValues(transformerSupplier, materialized.configure(this.context.getConfigurator()), named, + stateStoreNames); + } + @Override public ImprovedKGroupedTable groupBy( final KeyValueMapper> selector) { @@ -232,6 +296,13 @@ public ImprovedKTable join(final KTable other, return this.context.wrap(this.wrapped.join(otherTable, joiner, materialized)); } + @Override + public ImprovedKTable join(final KTable other, + final ValueJoiner joiner, + final ConfiguredMaterialized> materialized) { + return this.join(other, joiner, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable join(final KTable other, final ValueJoiner joiner, final Named named, @@ -240,6 +311,13 @@ public ImprovedKTable join(final KTable other, return this.context.wrap(this.wrapped.join(otherTable, joiner, materialized)); } + @Override + public ImprovedKTable join(final KTable other, + final ValueJoiner joiner, final Named named, + final ConfiguredMaterialized> materialized) { + return this.join(other, joiner, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable leftJoin(final KTable other, final ValueJoiner joiner) { @@ -262,6 +340,13 @@ public ImprovedKTable leftJoin(final KTable other, return this.context.wrap(this.wrapped.leftJoin(otherTable, joiner, materialized)); } + @Override + public ImprovedKTable leftJoin(final KTable other, + final ValueJoiner joiner, + final ConfiguredMaterialized> materialized) { + return this.leftJoin(other, joiner, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable leftJoin(final KTable other, final ValueJoiner joiner, final Named named, @@ -270,6 +355,13 @@ public ImprovedKTable leftJoin(final KTable other, return this.context.wrap(this.wrapped.leftJoin(otherTable, joiner, materialized)); } + @Override + public ImprovedKTable leftJoin(final KTable other, + final ValueJoiner joiner, final Named named, + final ConfiguredMaterialized> materialized) { + return this.leftJoin(other, joiner, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable outerJoin(final KTable other, final ValueJoiner joiner) { @@ -292,6 +384,13 @@ public ImprovedKTable outerJoin(final KTable other, return this.context.wrap(this.wrapped.outerJoin(otherTable, joiner, materialized)); } + @Override + public ImprovedKTable outerJoin(final KTable other, + final ValueJoiner joiner, + final ConfiguredMaterialized> materialized) { + return this.outerJoin(other, joiner, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable outerJoin(final KTable other, final ValueJoiner joiner, final Named named, @@ -300,6 +399,13 @@ public ImprovedKTable outerJoin(final KTable other, return this.context.wrap(this.wrapped.outerJoin(otherTable, joiner, materialized)); } + @Override + public ImprovedKTable outerJoin(final KTable other, + final ValueJoiner joiner, final Named named, + final ConfiguredMaterialized> materialized) { + return this.outerJoin(other, joiner, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, @@ -332,6 +438,12 @@ public ImprovedKTable join(final KTable other, return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, materialized)); } + @Override + public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, + final ValueJoiner joiner, final ConfiguredMaterialized> materialized) { + return this.join(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, @@ -341,6 +453,13 @@ public ImprovedKTable join(final KTable other, return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, named, materialized)); } + @Override + public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, + final ValueJoiner joiner, final Named named, + final ConfiguredMaterialized> materialized) { + return this.join(other, foreignKeyExtractor, joiner, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, @@ -350,6 +469,13 @@ public ImprovedKTable join(final KTable other, return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); } + @Override + public ImprovedKTable join(final KTable other, final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, + final ConfiguredMaterialized> materialized) { + return this.join(other, foreignKeyExtractor, joiner, tableJoined, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, @@ -382,6 +508,12 @@ public ImprovedKTable leftJoin(final KTable other, return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, materialized)); } + @Override + public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, + final ValueJoiner joiner, final ConfiguredMaterialized> materialized) { + return this.leftJoin(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, @@ -391,6 +523,13 @@ public ImprovedKTable leftJoin(final KTable other, return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, named, materialized)); } + @Override + public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, + final ValueJoiner joiner, final Named named, + final ConfiguredMaterialized> materialized) { + return this.leftJoin(other, foreignKeyExtractor, joiner, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, @@ -401,6 +540,14 @@ public ImprovedKTable leftJoin(final KTable other, this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); } + @Override + public ImprovedKTable leftJoin(final KTable other, final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, + final ConfiguredMaterialized> materialized) { + return this.leftJoin(other, foreignKeyExtractor, joiner, tableJoined, + materialized.configure(this.context.getConfigurator())); + } + @Override public String queryableStoreName() { return this.wrapped.queryableStoreName(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedKStream.java index 19a4c199..ea5002fa 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedKStream.java @@ -46,7 +46,13 @@ ImprovedKTable, VOut> aggregate(Initializer initializer, Merge ImprovedKTable, VOut> aggregate(Initializer initializer, Merger sessionMerger, Materialized> materialized); + ImprovedKTable, VOut> aggregate(Initializer initializer, Merger sessionMerger, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, VOut> aggregate(Initializer initializer, Merger sessionMerger, Named named, Materialized> materialized); + + ImprovedKTable, VOut> aggregate(Initializer initializer, Merger sessionMerger, + Named named, ConfiguredMaterialized> materialized); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedStreamImpl.java index acad2a7b..ec73b63b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedCogroupedStreamImpl.java @@ -61,10 +61,22 @@ public ImprovedKTable, V> aggregate(final Initializer initializer return this.context.wrap(this.wrapped.aggregate(initializer, sessionMerger, materialized)); } + @Override + public ImprovedKTable, V> aggregate(final Initializer initializer, final Merger sessionMerger, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, sessionMerger, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, V> aggregate(final Initializer initializer, final Merger sessionMerger, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.aggregate(initializer, sessionMerger, materialized)); } + + @Override + public ImprovedKTable, V> aggregate(final Initializer initializer, final Merger sessionMerger, + final Named named, final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, sessionMerger, named, materialized.configure(this.context.getConfigurator())); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedKStream.java index 83ed6b08..896ffffe 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedKStream.java @@ -47,10 +47,15 @@ public interface ImprovedSessionWindowedKStream extends SessionWindowedKSt @Override ImprovedKTable, Long> count(Materialized> materialized); + ImprovedKTable, Long> count(ConfiguredMaterialized> materialized); + @Override ImprovedKTable, Long> count(Named named, Materialized> materialized); + ImprovedKTable, Long> count(Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, VR> aggregate(Initializer initializer, Aggregator aggregator, @@ -66,12 +71,22 @@ ImprovedKTable, VR> aggregate(Initializer initializer, Aggregator aggregator, Merger sessionMerger, Materialized> materialized); + ImprovedKTable, VR> aggregate(Initializer initializer, + Aggregator aggregator, + Merger sessionMerger, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, VR> aggregate(Initializer initializer, Aggregator aggregator, Merger sessionMerger, Named named, Materialized> materialized); + ImprovedKTable, VR> aggregate(Initializer initializer, + Aggregator aggregator, + Merger sessionMerger, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, V> reduce(Reducer reducer); @@ -82,10 +97,16 @@ ImprovedKTable, VR> aggregate(Initializer initializer, ImprovedKTable, V> reduce(Reducer reducer, Materialized> materialized); + ImprovedKTable, V> reduce(Reducer reducer, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, V> reduce(Reducer reducer, Named named, Materialized> materialized); + ImprovedKTable, V> reduce(Reducer reducer, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedSessionWindowedKStream emitStrategy(EmitStrategy emitStrategy); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedStreamImpl.java index 57b03035..b330d075 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedSessionWindowedStreamImpl.java @@ -60,12 +60,24 @@ public ImprovedKTable, Long> count( return this.context.wrap(this.wrapped.count(materialized)); } + @Override + public ImprovedKTable, Long> count( + final ConfiguredMaterialized> materialized) { + return this.count(materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, Long> count(final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.count(named, materialized)); } + @Override + public ImprovedKTable, Long> count(final Named named, + final ConfiguredMaterialized> materialized) { + return this.count(named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, final Merger sessionMerger) { @@ -86,6 +98,13 @@ public ImprovedKTable, VR> aggregate(final Initializer init return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, sessionMerger, materialized)); } + @Override + public ImprovedKTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, final Merger sessionMerger, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, aggregator, sessionMerger, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, final Merger sessionMerger, @@ -94,6 +113,14 @@ public ImprovedKTable, VR> aggregate(final Initializer init return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, sessionMerger, materialized)); } + @Override + public ImprovedKTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, final Merger sessionMerger, final Named named, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, aggregator, sessionMerger, named, + materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, V> reduce(final Reducer reducer) { return this.context.wrap(this.wrapped.reduce(reducer)); @@ -110,12 +137,24 @@ public ImprovedKTable, V> reduce(final Reducer reducer, return this.context.wrap(this.wrapped.reduce(reducer, materialized)); } + @Override + public ImprovedKTable, V> reduce(final Reducer reducer, + final ConfiguredMaterialized> materialized) { + return this.reduce(reducer, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, V> reduce(final Reducer reducer, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.reduce(reducer, named, materialized)); } + @Override + public ImprovedKTable, V> reduce(final Reducer reducer, final Named named, + final ConfiguredMaterialized> materialized) { + return this.reduce(reducer, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedSessionWindowedKStream emitStrategy(final EmitStrategy emitStrategy) { return this.context.wrap(this.wrapped.emitStrategy(emitStrategy)); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedKStream.java index 89a12ae1..cbd90455 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedKStream.java @@ -44,7 +44,13 @@ public interface ImprovedTimeWindowedCogroupedKStream extends TimeWindo ImprovedKTable, VOut> aggregate(Initializer initializer, Materialized> materialized); + ImprovedKTable, VOut> aggregate(Initializer initializer, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, VOut> aggregate(Initializer initializer, Named named, Materialized> materialized); + + ImprovedKTable, VOut> aggregate(Initializer initializer, Named named, + ConfiguredMaterialized> materialized); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedStreamImpl.java index a7239b1c..752366b5 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedCogroupedStreamImpl.java @@ -56,9 +56,21 @@ public ImprovedKTable, V> aggregate(final Initializer initializer return this.context.wrap(this.wrapped.aggregate(initializer, materialized)); } + @Override + public ImprovedKTable, V> aggregate(final Initializer initializer, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, V> aggregate(final Initializer initializer, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.aggregate(initializer, named, materialized)); } + + @Override + public ImprovedKTable, V> aggregate(final Initializer initializer, final Named named, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, named, materialized.configure(this.context.getConfigurator())); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedKStream.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedKStream.java index 8e60053b..e57ba7cc 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedKStream.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedKStream.java @@ -46,10 +46,15 @@ public interface ImprovedTimeWindowedKStream extends TimeWindowedKStream, Long> count(Materialized> materialized); + ImprovedKTable, Long> count(ConfiguredMaterialized> materialized); + @Override ImprovedKTable, Long> count(Named named, Materialized> materialized); + ImprovedKTable, Long> count(Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, VR> aggregate(Initializer initializer, Aggregator aggregator); @@ -64,11 +69,19 @@ ImprovedKTable, VR> aggregate(Initializer initializer, Aggregator aggregator, Materialized> materialized); + ImprovedKTable, VR> aggregate(Initializer initializer, + Aggregator aggregator, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, VR> aggregate(Initializer initializer, Aggregator aggregator, Named named, Materialized> materialized); + ImprovedKTable, VR> aggregate(Initializer initializer, + Aggregator aggregator, + Named named, ConfiguredMaterialized> materialized); + @Override ImprovedKTable, V> reduce(Reducer reducer); @@ -79,10 +92,16 @@ ImprovedKTable, VR> aggregate(Initializer initializer, ImprovedKTable, V> reduce(Reducer reducer, Materialized> materialized); + ImprovedKTable, V> reduce(Reducer reducer, + ConfiguredMaterialized> materialized); + @Override ImprovedKTable, V> reduce(Reducer reducer, Named named, Materialized> materialized); + ImprovedKTable, V> reduce(Reducer reducer, Named named, + ConfiguredMaterialized> materialized); + @Override ImprovedTimeWindowedKStream emitStrategy(EmitStrategy emitStrategy); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedStreamImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedStreamImpl.java index 46f1a36e..0c858628 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedStreamImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedTimeWindowedStreamImpl.java @@ -59,12 +59,24 @@ public ImprovedKTable, Long> count( return this.context.wrap(this.wrapped.count(materialized)); } + @Override + public ImprovedKTable, Long> count( + final ConfiguredMaterialized> materialized) { + return this.count(materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, Long> count(final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.count(named, materialized)); } + @Override + public ImprovedKTable, Long> count(final Named named, + final ConfiguredMaterialized> materialized) { + return this.count(named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator) { @@ -84,6 +96,13 @@ public ImprovedKTable, VR> aggregate(final Initializer init return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, materialized)); } + @Override + public ImprovedKTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, aggregator, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, final Named named, @@ -91,6 +110,13 @@ public ImprovedKTable, VR> aggregate(final Initializer init return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, materialized)); } + @Override + public ImprovedKTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, final Named named, + final ConfiguredMaterialized> materialized) { + return this.aggregate(initializer, aggregator, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, V> reduce(final Reducer reducer) { return this.context.wrap(this.wrapped.reduce(reducer)); @@ -107,12 +133,24 @@ public ImprovedKTable, V> reduce(final Reducer reducer, return this.context.wrap(this.wrapped.reduce(reducer, materialized)); } + @Override + public ImprovedKTable, V> reduce(final Reducer reducer, + final ConfiguredMaterialized> materialized) { + return this.reduce(reducer, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedKTable, V> reduce(final Reducer reducer, final Named named, final Materialized> materialized) { return this.context.wrap(this.wrapped.reduce(reducer, named, materialized)); } + @Override + public ImprovedKTable, V> reduce(final Reducer reducer, final Named named, + final ConfiguredMaterialized> materialized) { + return this.reduce(reducer, named, materialized.configure(this.context.getConfigurator())); + } + @Override public ImprovedTimeWindowedKStream emitStrategy(final EmitStrategy emitStrategy) { return this.context.wrap(this.wrapped.emitStrategy(emitStrategy));