diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredConsumed.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredConsumed.java index 12d894b6..ffda6052 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredConsumed.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredConsumed.java @@ -61,9 +61,17 @@ public static ConfiguredConsumed as(final String processorName) { Consumed configure(final Configurator configurator) { return Consumed.as(this.name) - .withKeySerde(configurator.configureForKeys(this.keySerde)) - .withValueSerde(configurator.configureForValues(this.valueSerde)) + .withKeySerde(this.configureKeySerde(configurator)) + .withValueSerde(this.configureValueSerde(configurator)) .withOffsetResetPolicy(this.offsetResetPolicy) .withTimestampExtractor(this.timestampExtractor); } + + private Serde configureValueSerde(final Configurator configurator) { + return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde); + } + + private Serde configureKeySerde(final Configurator configurator) { + return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProduced.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProduced.java index 49f212a9..62d0d161 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProduced.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProduced.java @@ -59,8 +59,16 @@ public static ConfiguredProduced as(final String processorName) { Produced configure(final Configurator configurator) { return Produced.as(this.name) - .withKeySerde(configurator.configureForKeys(this.keySerde)) - .withValueSerde(configurator.configureForValues(this.valueSerde)) + .withKeySerde(this.configureKeySerde(configurator)) + .withValueSerde(this.configuredValueSerde(configurator)) .withStreamPartitioner(this.streamPartitioner); } + + private Serde configuredValueSerde(final Configurator configurator) { + return this.valueSerde == null ? null : configurator.configureForValues(this.valueSerde); + } + + private Serde configureKeySerde(final Configurator configurator) { + return this.keySerde == null ? null : configurator.configureForKeys(this.keySerde); + } }