Skip to content

Commit

Permalink
Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 27, 2025
1 parent d321b8c commit 5377422
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.With;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class ConfiguredRepartitioned<K, V> {

@With
private final Preconfigured<Serde<K>> keySerde;
@With
private final Preconfigured<Serde<V>> valueSerde;
@With
private final StreamPartitioner<K, V> streamPartitioner;
@With
private final String name;
private final Integer numberOfPartitions;

public static <K, V> ConfiguredRepartitioned<K, V> keySerde(final Preconfigured<Serde<K>> keySerde) {
return with(keySerde, null);
}

public static <K, V> ConfiguredRepartitioned<K, V> valueSerde(final Preconfigured<Serde<V>> valueSerde) {
return with(null, valueSerde);
}

public static <K, V> ConfiguredRepartitioned<K, V> with(final Preconfigured<Serde<K>> keySerde,
final Preconfigured<Serde<V>> valueSerde) {
return new ConfiguredRepartitioned<>(keySerde, valueSerde, null, null, null);
}

public static <K, V> ConfiguredRepartitioned<K, V> as(final String name) {
return new ConfiguredRepartitioned<>(null, null, null, name, null);
}

public static <K, V> ConfiguredRepartitioned<K, V> numberOfPartitions(final int numberOfPartitions) {
return new ConfiguredRepartitioned<>(null, null, null, null, numberOfPartitions);
}

public static <K, V> ConfiguredRepartitioned<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner) {
return new ConfiguredRepartitioned<>(null, null, partitioner, null, null);
}

public ConfiguredRepartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) {
return new ConfiguredRepartitioned<>(this.keySerde, this.valueSerde, this.streamPartitioner, this.name,
numberOfPartitions);
}

Repartitioned<K, V> configure(final Configurator configurator) {
return Repartitioned.<K, V>as(this.name)
.withKeySerde(this.configureKeySerde(configurator))
.withValueSerde(this.configuredValueSerde(configurator))
.withStreamPartitioner(this.streamPartitioner)
.withNumberOfPartitions(this.numberOfPartitions);
}

private Serde<V> configuredValueSerde(final Configurator configurator) {
return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde);
}

private Serde<K> configureKeySerde(final Configurator configurator) {
return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class ImprovedCogroupedStreamImpl<K, V> implements ImprovedCogroupedKStream<K, V
@Override
public <VIn> ImprovedCogroupedKStream<K, V> cogroup(final KGroupedStream<K, VIn> groupedStream,
final Aggregator<? super K, ? super VIn, V> aggregator) {
return this.context.wrap(this.wrapped.cogroup(groupedStream, aggregator));
final KGroupedStream<K, VIn> other = StreamsContext.maybeUnwrap(groupedStream);
return this.context.wrap(this.wrapped.cogroup(other, aggregator));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package com.bakdata.kafka;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.utils.Bytes;
Expand All @@ -42,6 +44,7 @@
@RequiredArgsConstructor
class ImprovedKGroupedStreamImpl<K, V> implements ImprovedKGroupedStream<K, V> {

@Getter(AccessLevel.PACKAGE)
private final @NonNull KGroupedStream<K, V> wrapped;
private final @NonNull StreamsContext context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ <VR> KErrorStream<K, V, K, VR> flatMapValuesCapturingErrors(
@Override
ImprovedKStream<K, V> repartition(Repartitioned<K, V> repartitioned);

ImprovedKStream<K, V> repartition(ConfiguredRepartitioned<K, V> repartitioned);

void toOutputTopic();

void toOutputTopic(Produced<K, V> produced);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package com.bakdata.kafka;

import java.util.Arrays;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -59,6 +61,7 @@
@RequiredArgsConstructor
class ImprovedKStreamImpl<K, V> implements ImprovedKStream<K, V> {

@Getter(AccessLevel.PACKAGE)
private final @NonNull KStream<K, V> wrapped;
private final @NonNull StreamsContext context;

Expand Down Expand Up @@ -385,12 +388,14 @@ public ImprovedBranchedKStream<K, V> split(final Named named) {

@Override
public ImprovedKStream<K, V> merge(final KStream<K, V> stream) {
return this.context.wrap(this.wrapped.merge(stream));
final KStream<K, V> other = StreamsContext.maybeUnwrap(stream);
return this.context.wrap(this.wrapped.merge(other));
}

@Override
public ImprovedKStream<K, V> merge(final KStream<K, V> stream, final Named named) {
return this.context.wrap(this.wrapped.merge(stream, named));
final KStream<K, V> other = StreamsContext.maybeUnwrap(stream);
return this.context.wrap(this.wrapped.merge(other, named));
}

@Override
Expand All @@ -413,6 +418,11 @@ public ImprovedKStream<K, V> repartition(final Repartitioned<K, V> repartitioned
return this.context.wrap(this.wrapped.repartition(repartitioned));
}

@Override
public ImprovedKStream<K, V> repartition(final ConfiguredRepartitioned<K, V> repartitioned) {
return this.repartition(repartitioned.configure(this.context.getConfigurator()));
}

@Override
public void to(final String topic) {
this.wrapped.to(topic);
Expand Down Expand Up @@ -523,132 +533,152 @@ public ImprovedKGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) {
@Override
public <VO, VR> ImprovedKStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows) {
return this.context.wrap(this.wrapped.join(otherStream, joiner, windows));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.join(other, joiner, windows));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows) {
return this.context.wrap(this.wrapped.join(otherStream, joiner, windows));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.join(other, joiner, windows));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
return this.context.wrap(this.wrapped.join(otherStream, joiner, windows, streamJoined));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.join(other, joiner, windows, streamJoined));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
return this.context.wrap(this.wrapped.join(otherStream, joiner, windows, streamJoined));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.join(other, joiner, windows, streamJoined));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows) {
return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows) {
return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows, streamJoined));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows, streamJoined));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
return this.context.wrap(this.wrapped.leftJoin(otherStream, joiner, windows, streamJoined));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.leftJoin(other, joiner, windows, streamJoined));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows) {
return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows) {
return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows, streamJoined));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows, streamJoined));
}

@Override
public <VO, VR> ImprovedKStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
return this.context.wrap(this.wrapped.outerJoin(otherStream, joiner, windows, streamJoined));
final KStream<K, VO> other = StreamsContext.maybeUnwrap(otherStream);
return this.context.wrap(this.wrapped.outerJoin(other, joiner, windows, streamJoined));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner) {
return this.context.wrap(this.wrapped.join(table, joiner));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.join(other, joiner));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> join(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner) {
return this.context.wrap(this.wrapped.join(table, joiner));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.join(other, joiner));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> joined) {
return this.context.wrap(this.wrapped.join(table, joiner, joined));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.join(other, joiner, joined));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> join(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined) {
return this.context.wrap(this.wrapped.join(table, joiner, joined));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.join(other, joiner, joined));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner) {
return this.context.wrap(this.wrapped.leftJoin(table, joiner));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.leftJoin(other, joiner));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner) {
return this.context.wrap(this.wrapped.leftJoin(table, joiner));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.leftJoin(other, joiner));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> joined) {
return this.context.wrap(this.wrapped.leftJoin(table, joiner, joined));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.leftJoin(other, joiner, joined));
}

@Override
public <VT, VR> ImprovedKStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined) {
return this.context.wrap(this.wrapped.leftJoin(table, joiner, joined));
final KTable<K, VT> other = StreamsContext.maybeUnwrap(table);
return this.context.wrap(this.wrapped.leftJoin(other, joiner, joined));
}

@Override
Expand Down
Loading

0 comments on commit 5377422

Please sign in to comment.