Skip to content

Commit

Permalink
Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 27, 2025
1 parent 8f1b481 commit d321b8c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,17 @@ public static <K, V> ConfiguredConsumed<K, V> as(final String processorName) {

Consumed<K, V> configure(final Configurator configurator) {
return Consumed.<K, V>as(this.name)
.withKeySerde(configurator.configureForKeys(this.keySerde))
.withValueSerde(configurator.configureForValues(this.valueSerde))
.withKeySerde(this.configureKeySerde(configurator))
.withValueSerde(this.configureValueSerde(configurator))
.withOffsetResetPolicy(this.offsetResetPolicy)
.withTimestampExtractor(this.timestampExtractor);
}

private Serde<V> configureValueSerde(final Configurator configurator) {
return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde);
}

private Serde<K> configureKeySerde(final Configurator configurator) {
return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ public static <K, V> ConfiguredProduced<K, V> as(final String processorName) {

Produced<K, V> configure(final Configurator configurator) {
return Produced.<K, V>as(this.name)
.withKeySerde(configurator.configureForKeys(this.keySerde))
.withValueSerde(configurator.configureForValues(this.valueSerde))
.withKeySerde(this.configureKeySerde(configurator))
.withValueSerde(this.configuredValueSerde(configurator))
.withStreamPartitioner(this.streamPartitioner);
}

private Serde<V> configuredValueSerde(final Configurator configurator) {
return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde);
}

private Serde<K> configureKeySerde(final Configurator configurator) {
return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde);
}
}

0 comments on commit d321b8c

Please sign in to comment.