diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStores.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStores.java new file mode 100644 index 00000000..0c8093a5 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStores.java @@ -0,0 +1,86 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +@RequiredArgsConstructor +public class ConfiguredStores { + + private final @NonNull Configurator configurator; + + public StoreBuilder> sessionStoreBuilder(final SessionBytesStoreSupplier supplier, + final Preconfigured> keySerde, final Preconfigured> valueSerde) { + return Stores.sessionStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + public StoreBuilder> timestampedWindowStoreBuilder( + final WindowBytesStoreSupplier supplier, final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return Stores.timestampedWindowStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + public StoreBuilder> windowStoreBuilder(final WindowBytesStoreSupplier supplier, + final Preconfigured> keySerde, final Preconfigured> valueSerde) { + return Stores.windowStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + public StoreBuilder> versionedKeyValueStoreBuilder( + final VersionedBytesStoreSupplier supplier, final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return Stores.versionedKeyValueStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + public StoreBuilder> timestampedKeyValueStoreBuilder( + final KeyValueBytesStoreSupplier supplier, final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return Stores.timestampedKeyValueStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + public StoreBuilder> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, + final Preconfigured> keySerde, final Preconfigured> valueSerde) { + return Stores.keyValueStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java index 34f90eb7..0a6ec451 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java @@ -229,6 +229,10 @@ public StreamsContext getContext() { return new StreamsContext(this.topics, this.createConfigurator()); } + public ConfiguredStores stores() { + return new ConfiguredStores(this.createConfigurator()); + } + Topology build() { return this.streamsBuilder.build(); }