Skip to content

Commit

Permalink
Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 27, 2025
1 parent 5377422 commit a5f272e
Show file tree
Hide file tree
Showing 19 changed files with 629 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.With;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslStoreSuppliers;

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class ConfiguredMaterialized<K, V, S extends StateStore> {

@With
private final Preconfigured<Serde<K>> keySerde;
@With
private final Preconfigured<Serde<V>> valueSerde;
private final String storeName;
@With
private final Duration retention;
@With
private final DslStoreSuppliers storeType;
private final Map<String, String> topicConfig;
private final boolean loggingEnabled;
private final boolean cachingEnabled;

public static <K, V, S extends StateStore> ConfiguredMaterialized<K, V, S> keySerde(
final Preconfigured<Serde<K>> keySerde) {
return with(keySerde, null);
}

public static <K, V, S extends StateStore> ConfiguredMaterialized<K, V, S> valueSerde(
final Preconfigured<Serde<V>> valueSerde) {
return with(null, valueSerde);
}

public static <K, V, S extends StateStore> ConfiguredMaterialized<K, V, S> with(
final Preconfigured<Serde<K>> keySerde,
final Preconfigured<Serde<V>> valueSerde) {
return new ConfiguredMaterialized<>(keySerde, valueSerde, null, null, null, new HashMap<>(), true, true);
}

public static <K, V, S extends StateStore> ConfiguredMaterialized<K, V, S> as(final String storeName) {
return new ConfiguredMaterialized<>(null, null, storeName, null, null, new HashMap<>(), true, true);
}

public static <K, V, S extends StateStore> ConfiguredMaterialized<K, V, S> as(
final DslStoreSuppliers storeSuppliers) {
return new ConfiguredMaterialized<>(null, null, null, null, storeSuppliers, new HashMap<>(), true, true);
}

Materialized<K, V, S> configure(final Configurator configurator) {
final Materialized<K, V, S> materialized = Materialized.<K, V, S>as(this.storeName)
.withKeySerde(this.configureKeySerde(configurator))
.withValueSerde(this.configuredValueSerde(configurator));
if (this.retention != null) {
materialized.withRetention(this.retention);
}
if (this.storeType != null) {
materialized.withStoreType(this.storeType);
}
if (this.loggingEnabled) {
materialized.withLoggingEnabled(this.topicConfig);
} else {
materialized.withLoggingDisabled();
}
if (this.cachingEnabled) {
materialized.withCachingEnabled();
} else {
materialized.withCachingDisabled();
}
return materialized;
}

private Serde<V> configuredValueSerde(final Configurator configurator) {
return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde);
}

private Serde<K> configureKeySerde(final Configurator configurator) {
return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ <VIn> ImprovedCogroupedKStream<K, VOut> cogroup(KGroupedStream<K, VIn> groupedSt
ImprovedKTable<K, VOut> aggregate(Initializer<VOut> initializer,
Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, VOut> aggregate(Initializer<VOut> initializer,
ConfiguredMaterialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, VOut> aggregate(Initializer<VOut> initializer, Named named,
Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, VOut> aggregate(Initializer<VOut> initializer, Named named,
ConfiguredMaterialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<W extends Window> ImprovedTimeWindowedCogroupedKStream<K, VOut> windowedBy(Windows<W> windows);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,24 @@ public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer,
return this.context.wrap(this.wrapped.aggregate(initializer, materialized));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer,
final ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, materialized.configure(this.context.getConfigurator()));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, named, materialized));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer, final Named named,
final ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public <W extends Window> ImprovedTimeWindowedCogroupedKStream<K, V> windowedBy(final Windows<W> windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,47 @@ public interface ImprovedKGroupedStream<K, V> extends KGroupedStream<K, V> {
@Override
ImprovedKTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, Long> count(ConfiguredMaterialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, Long> count(Named named, Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, Long> count(Named named,
ConfiguredMaterialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, V> reduce(Reducer<V> reducer);

@Override
ImprovedKTable<K, V> reduce(Reducer<V> reducer, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, V> reduce(Reducer<V> reducer,
ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, V> reduce(Reducer<V> reducer, Named named,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, V> reduce(Reducer<V> reducer, Named named,
ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator);

@Override
<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
ConfiguredMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
Named named, ConfiguredMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<W extends Window> ImprovedTimeWindowedKStream<K, V> windowedBy(Windows<W> windows);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,23 @@ public ImprovedKTable<K, Long> count(final Materialized<K, Long, KeyValueStore<B
return this.context.wrap(this.wrapped.count(materialized));
}

@Override
public ImprovedKTable<K, Long> count(final ConfiguredMaterialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.count(materialized.configure(this.context.getConfigurator()));
}

@Override
public ImprovedKTable<K, Long> count(final Named named,
final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.count(named, materialized));
}

@Override
public ImprovedKTable<K, Long> count(final Named named,
final ConfiguredMaterialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.count(named, materialized.configure(this.context.getConfigurator()));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer) {
return this.context.wrap(this.wrapped.reduce(reducer));
Expand All @@ -80,12 +91,24 @@ public ImprovedKTable<K, V> reduce(final Reducer<V> reducer,
return this.context.wrap(this.wrapped.reduce(reducer, materialized));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer,
final ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.reduce(reducer, materialized.configure(this.context.getConfigurator()));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.reduce(reducer, named, materialized));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer, final Named named,
final ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.reduce(reducer, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
Expand All @@ -99,13 +122,27 @@ public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, materialized));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final ConfiguredMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, aggregator, materialized.configure(this.context.getConfigurator()));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, named, materialized));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Named named,
final ConfiguredMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, aggregator, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public <W extends Window> ImprovedTimeWindowedKStream<K, V> windowedBy(final Windows<W> windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ public interface ImprovedKGroupedTable<K, V> extends KGroupedTable<K, V> {
@Override
ImprovedKTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, Long> count(ConfiguredMaterialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, Long> count(Named named, Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, Long> count(Named named,
ConfiguredMaterialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, Long> count();

Expand All @@ -51,10 +56,16 @@ public interface ImprovedKGroupedTable<K, V> extends KGroupedTable<K, V> {
ImprovedKTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor,
ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, Named named,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

ImprovedKTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, Named named,
ConfiguredMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
ImprovedKTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor);

Expand All @@ -63,11 +74,19 @@ <VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? s
Aggregator<? super K, ? super V, VR> subtractor,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor,
ConfiguredMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor, Named named,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor, Named named,
ConfiguredMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> ImprovedKTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor);
Expand Down
Loading

0 comments on commit a5f272e

Please sign in to comment.