Skip to content

Commit

Permalink
Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 27, 2025
1 parent 59591d5 commit 322eb93
Show file tree
Hide file tree
Showing 18 changed files with 974 additions and 222 deletions.
1 change: 1 addition & 0 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
version = "2.0.9"
) // required because other dependencies use Slf4j 1.x which is not properly resolved if this library is used in test scope
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
implementation(group = "com.bakdata.kafka", name = "error-handling-core", version = "1.6.1-SNAPSHOT")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.Map;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;

public interface ImprovedBranchedKStream<K, V> extends BranchedKStream<K, V> {

@Override
ImprovedBranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);

@Override
ImprovedBranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched);

@Override
Map<String, KStream<K, V>> defaultBranch();

@Override
Map<String, KStream<K, V>> defaultBranch(Branched<K, V> branched);

@Override
Map<String, KStream<K, V>> noDefaultBranch();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;

@RequiredArgsConstructor
public class ImprovedBranchedKStreamImpl<K, V> implements ImprovedBranchedKStream<K, V> {

private final @NonNull BranchedKStream<K, V> wrapped;
private final @NonNull StreamsContext context;

@Override
public ImprovedBranchedKStream<K, V> branch(final Predicate<? super K, ? super V> predicate) {
return this.context.wrap(this.wrapped.branch(predicate));
}

@Override
public ImprovedBranchedKStream<K, V> branch(final Predicate<? super K, ? super V> predicate,
final Branched<K, V> branched) {
return this.context.wrap(this.wrapped.branch(predicate, branched));
}

@Override
public Map<String, KStream<K, V>> defaultBranch() {
return this.wrap(this.wrapped.defaultBranch());
}

@Override
public Map<String, KStream<K, V>> defaultBranch(final Branched<K, V> branched) {
return this.wrap(this.wrapped.defaultBranch(branched));
}

@Override
public Map<String, KStream<K, V>> noDefaultBranch() {
return this.wrap(this.wrapped.noDefaultBranch());
}

private Map<String, KStream<K, V>> wrap(final Map<String, ? extends KStream<K, V>> streamMap) {
return streamMap.entrySet()
.stream()
.collect(Collectors.toMap(Entry::getKey, this::wrapValue));
}

private ImprovedKStream<K, V> wrapValue(final Entry<String, ? extends KStream<K, V>> entry) {
return this.context.wrap(entry.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,43 @@ class ImprovedCogroupedStreamImpl<K, V> implements ImprovedCogroupedKStream<K, V
@Override
public <VIn> ImprovedCogroupedKStream<K, V> cogroup(final KGroupedStream<K, VIn> groupedStream,
final Aggregator<? super K, ? super VIn, V> aggregator) {
return this.context.newCogroupedStream(this.wrapped.cogroup(groupedStream, aggregator));
return this.context.wrap(this.wrapped.cogroup(groupedStream, aggregator));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer) {
return this.context.newTable(this.wrapped.aggregate(initializer));
return this.context.wrap(this.wrapped.aggregate(initializer));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer, final Named named) {
return this.context.newTable(this.wrapped.aggregate(initializer, named));
return this.context.wrap(this.wrapped.aggregate(initializer, named));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.aggregate(initializer, materialized));
return this.context.wrap(this.wrapped.aggregate(initializer, materialized));
}

@Override
public ImprovedKTable<K, V> aggregate(final Initializer<V> initializer, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.aggregate(initializer, named, materialized));
return this.context.wrap(this.wrapped.aggregate(initializer, named, materialized));
}

@Override
public <W extends Window> ImprovedTimeWindowedCogroupedKStream<K, V> windowedBy(final Windows<W> windows) {
return this.context.newTimeWindowedCogroupedStream(this.wrapped.windowedBy(windows));
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public ImprovedTimeWindowedCogroupedKStream<K, V> windowedBy(final SlidingWindows windows) {
return this.context.newTimeWindowedCogroupedStream(this.wrapped.windowedBy(windows));
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public ImprovedSessionWindowedCogroupedKStream<K, V> windowedBy(final SessionWindows windows) {
return this.context.newSessionWindowedCogroupedStream(this.wrapped.windowedBy(windows));
return this.context.wrap(this.wrapped.windowedBy(windows));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,79 +47,79 @@ class ImprovedKGroupedStreamImpl<K, V> implements ImprovedKGroupedStream<K, V> {

@Override
public ImprovedKTable<K, Long> count() {
return this.context.newTable(this.wrapped.count());
return this.context.wrap(this.wrapped.count());
}

@Override
public ImprovedKTable<K, Long> count(final Named named) {
return this.context.newTable(this.wrapped.count(named));
return this.context.wrap(this.wrapped.count(named));
}

@Override
public ImprovedKTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.count(materialized));
return this.context.wrap(this.wrapped.count(materialized));
}

@Override
public ImprovedKTable<K, Long> count(final Named named,
final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.count(named, materialized));
return this.context.wrap(this.wrapped.count(named, materialized));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer) {
return this.context.newTable(this.wrapped.reduce(reducer));
return this.context.wrap(this.wrapped.reduce(reducer));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.reduce(reducer, materialized));
return this.context.wrap(this.wrapped.reduce(reducer, materialized));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> reducer, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.reduce(reducer, named, materialized));
return this.context.wrap(this.wrapped.reduce(reducer, named, materialized));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
return this.context.newTable(this.wrapped.aggregate(initializer, aggregator));
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.aggregate(initializer, aggregator, materialized));
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, materialized));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.aggregate(initializer, aggregator, named, materialized));
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, named, materialized));
}

@Override
public <W extends Window> ImprovedTimeWindowedKStream<K, V> windowedBy(final Windows<W> windows) {
return this.context.newTimeWindowedStream(this.wrapped.windowedBy(windows));
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public ImprovedTimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows) {
return this.context.newTimeWindowedStream(this.wrapped.windowedBy(windows));
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public ImprovedSessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
return this.context.newSessionWindowedStream(this.wrapped.windowedBy(windows));
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public <VOut> ImprovedCogroupedKStream<K, VOut> cogroup(final Aggregator<? super K, ? super V, VOut> aggregator) {
return this.context.newCogroupedStream(this.wrapped.cogroup(aggregator));
return this.context.wrap(this.wrapped.cogroup(aggregator));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,69 +43,69 @@ class ImprovedKGroupedTableImpl<K, V> implements ImprovedKGroupedTable<K, V> {

@Override
public ImprovedKTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.count(materialized));
return this.context.wrap(this.wrapped.count(materialized));
}

@Override
public ImprovedKTable<K, Long> count(final Named named,
final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.count(named, materialized));
return this.context.wrap(this.wrapped.count(named, materialized));
}

@Override
public ImprovedKTable<K, Long> count() {
return this.context.newTable(this.wrapped.count());
return this.context.wrap(this.wrapped.count());
}

@Override
public ImprovedKTable<K, Long> count(final Named named) {
return this.context.newTable(this.wrapped.count(named));
return this.context.wrap(this.wrapped.count(named));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.reduce(adder, subtractor, materialized));
return this.context.wrap(this.wrapped.reduce(adder, subtractor, materialized));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.reduce(adder, subtractor, materialized));
return this.context.wrap(this.wrapped.reduce(adder, subtractor, materialized));
}

@Override
public ImprovedKTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor) {
return this.context.newTable(this.wrapped.reduce(adder, subtractor));
return this.context.wrap(this.wrapped.reduce(adder, subtractor));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.aggregate(initializer, adder, subtractor, materialized));
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, materialized));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.newTable(this.wrapped.aggregate(initializer, adder, subtractor, materialized));
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, materialized));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor) {
return this.context.newTable(this.wrapped.aggregate(initializer, adder, subtractor));
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor));
}

@Override
public <VR> ImprovedKTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor, final Named named) {
return this.context.newTable(this.wrapped.aggregate(initializer, adder, subtractor, named));
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, named));
}
}
Loading

0 comments on commit 322eb93

Please sign in to comment.