Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent 33d841a commit 220083a
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -149,21 +148,6 @@ protected KafkaEndpointConfig getEndpointConfig() {
.build();
}

/**
* Kafka configurations overrides
* @return Kafka configuration
*/
protected Map<String, Object> getKafkaConfigOverrides() {
return Collections.emptyMap();
}

protected Map<String, Object> getFullKafkaConfig() {
final Map<String, Object> config = new HashMap<>();
config.putAll(this.getKafkaConfigOverrides());
config.putAll(this.kafkaConfig);
return Collections.unmodifiableMap(config);
}

private void startApplication() {
log.info("Starting application");
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private ConfiguredProducerApp<ProducerApp> createConfiguredApp() {

private ProducerAppConfiguration createConfiguration() {
final ProducerTopicConfig topics = this.createTopicConfig();
final Map<String, Object> kafkaConfig = this.getFullKafkaConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
return ProducerAppConfiguration.builder()
.topics(topics)
.kafkaConfig(kafkaConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean clean

private StreamsAppConfiguration createConfiguration() {
final StreamsTopicConfig topics = this.createTopicConfig();
final Map<String, Object> kafkaConfig = this.getFullKafkaConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
final StreamsConfigurationOptions streamsOptions = this.createStreamsOptions();
return StreamsAppConfiguration.builder()
.topics(topics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
@FunctionalInterface
public interface ProducerApp extends AutoCloseable {

/**
* Setup Kafka resources, such as topics, before running this app
* @param configuration provides all runtime application configurations
*/
default void setup(final ProducerAppSetupConfiguration configuration) {
// do nothing by default
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ public class ProducerAppConfiguration {
@Builder.Default
@NonNull ProducerTopicConfig topics = ProducerTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, Object> kafkaConfig = emptyMap();
@NonNull Map<String, ?> kafkaConfig = emptyMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
public interface StreamsApp extends AutoCloseable {
int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3;

/**
* Setup Kafka resources, such as topics, before running this app
* @param configuration provides all runtime application configurations
*/
default void setup(final StreamsAppSetupConfiguration configuration) {
// do nothing by default
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class StreamsAppConfiguration {
@Builder.Default
@NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, Object> kafkaConfig = emptyMap();
@NonNull Map<String, ?> kafkaConfig = emptyMap();
@Builder.Default
@NonNull StreamsConfigurationOptions options = StreamsConfigurationOptions.builder().build();
}

0 comments on commit 220083a

Please sign in to comment.