Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 16, 2024
1 parent 1896e7e commit d603699
Show file tree
Hide file tree
Showing 26 changed files with 124 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
public interface LargeMessageProducerApp extends ProducerApp {

@Override
default ProducerCleanUpConfiguration setupCleanUp(final EffectiveProducerAppConfiguration configuration) {
default ProducerCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(configuration);
return configurer.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
public interface LargeMessageStreamsApp extends StreamsApp {

@Override
default StreamsCleanUpConfiguration setupCleanUp(final EffectiveStreamsAppConfiguration configuration) {
default StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(configuration);
return configurer.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,10 @@ public void run() {
* Create configuration to configure app
* @return configuration
*/
public final ProducerAppConfiguration createConfiguration() {
public final AppConfiguration<ProducerTopicConfig> createConfiguration() {
final ProducerTopicConfig topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
return ProducerAppConfiguration.builder()
.topics(topics)
.kafkaConfig(kafkaConfig)
.build();
return new AppConfiguration<>(topics, kafkaConfig);
}

@Override
Expand All @@ -107,7 +104,7 @@ public final ProducerTopicConfig createTopicConfig() {
@Override
public final ConfiguredProducerApp<ProducerApp> createConfiguredApp(final boolean cleanUp) {
final ProducerApp app = this.createApp(cleanUp);
final ProducerAppConfiguration configuration = this.createConfiguration();
final AppConfiguration<ProducerTopicConfig> configuration = this.createConfiguration();
return new ConfiguredProducerApp<>(app, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,10 @@ public void reset() {
* Create configuration to configure app
* @return configuration
*/
public final StreamsAppConfiguration createConfiguration() {
public final AppConfiguration<StreamsTopicConfig> createConfiguration() {
final StreamsTopicConfig topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
return StreamsAppConfiguration.builder()
.topics(topics)
.kafkaConfig(kafkaConfig)
.build();
return new AppConfiguration<>(topics, kafkaConfig);
}

@Override
Expand Down Expand Up @@ -164,7 +161,7 @@ public final StreamsTopicConfig createTopicConfig() {
@Override
public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final StreamsApp app = this.createApp(cleanUp);
final StreamsAppConfiguration configuration = this.createConfiguration();
final AppConfiguration<StreamsTopicConfig> configuration = this.createConfiguration();
return new ConfiguredStreamsApp<>(app, configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@
import static java.util.Collections.emptyMap;

import java.util.Map;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;

/**
* Configuration of {@link StreamsApp}. This includes {@link StreamsTopicConfig} and Kafka configuration
* Configuration of an app. This includes topics and Kafka configuration
*/
@Builder
@Value
@RequiredArgsConstructor
@EqualsAndHashCode
public class StreamsAppConfiguration {
@Builder.Default
@NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, ?> kafkaConfig = emptyMap();
public class AppConfiguration<T> {
@NonNull
T topics;
@NonNull
Map<String, ?> kafkaConfig;

public AppConfiguration(final T topics) {
this(topics, emptyMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import org.apache.kafka.common.serialization.StringSerializer;

/**
* A {@link ProducerApp} with a corresponding {@link ProducerAppConfiguration}
* A {@link ProducerApp} with a corresponding {@link AppConfiguration}
* @param <T> type of {@link ProducerApp}
*/
@RequiredArgsConstructor
public class ConfiguredProducerApp<T extends ProducerApp> implements ConfiguredApp<ExecutableProducerApp<T>> {
@Getter
private final @NonNull T app;
private final @NonNull ProducerAppConfiguration configuration;
private final @NonNull AppConfiguration<ProducerTopicConfig> configuration;

private static Map<String, Object> createKafkaProperties(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaConfig = new HashMap<>();
Expand Down Expand Up @@ -89,7 +89,7 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
* {@link EnvironmentKafkaConfigParser#parseVariables(Map)})
* </li>
* <li>
* Configs provided by {@link ProducerAppConfiguration#getKafkaConfig()}
* Configs provided by {@link AppConfiguration#getKafkaConfig()}
* </li>
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import org.apache.kafka.streams.Topology;

/**
* A {@link StreamsApp} with a corresponding {@link StreamsAppConfiguration}
* A {@link StreamsApp} with a corresponding {@link AppConfiguration}
* @param <T> type of {@link StreamsApp}
*/
@RequiredArgsConstructor
public class ConfiguredStreamsApp<T extends StreamsApp> implements ConfiguredApp<ExecutableStreamsApp<T>> {
@Getter
private final @NonNull T app;
private final @NonNull StreamsAppConfiguration configuration;
private final @NonNull AppConfiguration<StreamsTopicConfig> configuration;

private static Map<String, Object> createKafkaProperties(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaConfig = new HashMap<>();
Expand Down Expand Up @@ -96,7 +96,7 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
* {@link EnvironmentKafkaConfigParser#parseVariables(Map)})
* </li>
* <li>
* Configs provided by {@link StreamsAppConfiguration#getKafkaConfig()}
* Configs provided by {@link AppConfiguration#getKafkaConfig()}
* </li>
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
Expand Down Expand Up @@ -145,10 +145,8 @@ public StreamsTopicConfig getTopics() {
public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties);
final EffectiveStreamsAppConfiguration effectiveConfiguration = EffectiveStreamsAppConfiguration.builder()
.kafkaProperties(kafkaProperties)
.topics(this.getTopics())
.build();
final EffectiveAppConfiguration<StreamsTopicConfig> effectiveConfiguration =
new EffectiveAppConfiguration<>(this.getTopics(), kafkaProperties);
return ExecutableStreamsApp.<T>builder()
.topology(topology)
.config(new StreamsConfig(kafkaProperties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,26 @@

package com.bakdata.kafka;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.util.ImprovedAdminClient;
import java.util.Map;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;

/**
* Configuration for setting up a {@link StreamsApp}
* @see StreamsApp#setup(EffectiveStreamsAppConfiguration)
* @see StreamsApp#setupCleanUp(EffectiveStreamsAppConfiguration)
* Configuration for setting up an app
* @see StreamsApp#setup(EffectiveAppConfiguration)
* @see StreamsApp#setupCleanUp(EffectiveAppConfiguration)
* @see ProducerApp#setup(EffectiveAppConfiguration)
* @see ProducerApp#setupCleanUp(EffectiveAppConfiguration)
*/
@Builder
@Value
@EqualsAndHashCode
public class EffectiveStreamsAppConfiguration {
@Builder.Default
@NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, Object> kafkaProperties = emptyMap();
public class EffectiveAppConfiguration<T> {
@NonNull
T topics;
@NonNull
Map<String, Object> kafkaProperties;

/**
* Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ExecutableProducerApp<T extends ProducerApp>
*/
@Override
public ProducerCleanUpRunner createCleanUpRunner() {
final EffectiveProducerAppConfiguration configuration = this.createEffectiveConfiguration();
final EffectiveAppConfiguration<ProducerTopicConfig> configuration = this.createEffectiveConfiguration();
final ProducerCleanUpConfiguration configurer = this.app.setupCleanUp(configuration);
return ProducerCleanUpRunner.create(this.topics, this.kafkaProperties, configurer);
}
Expand All @@ -68,7 +68,7 @@ public ProducerRunner createRunner(final ProducerExecutionOptions options) {
.topics(this.topics)
.kafkaProperties(this.kafkaProperties)
.build();
final EffectiveProducerAppConfiguration configuration = this.createEffectiveConfiguration();
final EffectiveAppConfiguration<ProducerTopicConfig> configuration = this.createEffectiveConfiguration();
this.app.setup(configuration);
return new ProducerRunner(this.app.buildRunnable(producerBuilder));
}
Expand All @@ -78,10 +78,7 @@ public void close() {
this.app.close();
}

private EffectiveProducerAppConfiguration createEffectiveConfiguration() {
return EffectiveProducerAppConfiguration.builder()
.topics(this.topics)
.kafkaProperties(this.kafkaProperties)
.build();
private EffectiveAppConfiguration<ProducerTopicConfig> createEffectiveConfiguration() {
return new EffectiveAppConfiguration<>(this.topics, this.kafkaProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ProducerApp extends AutoCloseable {
* Setup Kafka resources, such as topics, before running this app
* @param configuration provides all runtime application configurations
*/
default void setup(final EffectiveProducerAppConfiguration configuration) {
default void setup(final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
// do nothing by default
}

Expand All @@ -63,7 +63,8 @@ default Map<String, Object> createKafkaProperties() {
* @return {@code ProducerCleanUpConfiguration}
* @see ProducerCleanUpRunner
*/
default ProducerCleanUpConfiguration setupCleanUp(final EffectiveProducerAppConfiguration configuration) {
default ProducerCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
return new ProducerCleanUpConfiguration();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,10 @@ public Configurator createConfigurator() {
}

/**
* Create {@code EffectiveProducerAppConfiguration} used by this app
* @return {@code EffectiveProducerAppConfiguration}
* Create {@code EffectiveAppConfiguration} used by this app
* @return {@code EffectiveAppConfiguration}
*/
public EffectiveProducerAppConfiguration createEffectiveConfiguration() {
return EffectiveProducerAppConfiguration.builder()
.kafkaProperties(this.kafkaProperties)
.topics(this.topics)
.build();
public EffectiveAppConfiguration<ProducerTopicConfig> createEffectiveConfiguration() {
return new EffectiveAppConfiguration<>(this.topics, this.kafkaProperties);
}
}
Loading

0 comments on commit d603699

Please sign in to comment.