Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent 461152b commit 49f12fd
Show file tree
Hide file tree
Showing 21 changed files with 327 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
Expand Down Expand Up @@ -64,7 +66,7 @@
@RequiredArgsConstructor
@Slf4j
@Command(mixinStandardHelpOptions = true)
public abstract class KafkaApplication implements Runnable, AutoCloseable {
public abstract class KafkaApplication<O> implements Runnable, AutoCloseable {
private static final String ENV_PREFIX = Optional.ofNullable(System.getenv("ENV_PREFIX")).orElse("APP_");
@CommandLine.Option(names = "--output-topic", description = "Output topic")
private String outputTopic;
Expand All @@ -78,6 +80,10 @@ public abstract class KafkaApplication implements Runnable, AutoCloseable {
private String schemaRegistryUrl;
@CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties")
private Map<String, String> kafkaConfig = new HashMap<>();
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<RunningApp<Runner>> runningApps = new ConcurrentLinkedDeque<>();

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
Expand All @@ -88,7 +94,7 @@ public abstract class KafkaApplication implements Runnable, AutoCloseable {
* @param args Arguments passed in by the custom application class.
* @see #startApplicationWithoutExit(KafkaApplication, String[])
*/
public static void startApplication(final KafkaApplication app, final String[] args) {
public static void startApplication(final KafkaApplication<?> app, final String[] args) {
final int exitCode = startApplicationWithoutExit(app, args);
System.exit(exitCode);
}
Expand All @@ -101,7 +107,7 @@ public static void startApplication(final KafkaApplication app, final String[] a
* @param args Arguments passed in by the custom application class.
* @return Exit code of application
*/
public static int startApplicationWithoutExit(final KafkaApplication app, final String[] args) {
public static int startApplicationWithoutExit(final KafkaApplication<?> app, final String[] args) {
final String[] populatedArgs = addEnvironmentVariablesArguments(args);
final CommandLine commandLine = new CommandLine(app)
.setExecutionStrategy(app::execute);
Expand All @@ -121,11 +127,31 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) {
/**
* Clean all resources associated with this application
*/
public abstract void clean();
public void clean() {
try (final ExecutableApp<?, ? extends CleanUpRunner, ?> app = this.createExecutableApp(true)) {
final CleanUpRunner cleanUpRunner = app.createCleanUpRunner();
cleanUpRunner.clean();
}
}

/**
* Stop all applications that have been started by {@link #run()}.
*/
@Override
public void close() {
// do nothing by default
this.runningApps.forEach(RunningApp::close);
}

/**
* Run the application.
*/
@Override
public void run() {
try (final RunningApp<Runner> runningApp = this.createRunningApp()) {
this.runningApps.add(runningApp);
runningApp.run();
this.runningApps.remove(runningApp);
}
}

/**
Expand All @@ -148,6 +174,10 @@ protected KafkaEndpointConfig getEndpointConfig() {
.build();
}

protected abstract ExecutableApp<? extends Runner, ? extends CleanUpRunner, O> createExecutableApp(boolean cleanUp);

protected abstract O createExecutionOptions();

private void startApplication() {
log.info("Starting application");
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
Expand All @@ -163,4 +193,28 @@ private int execute(final ParseResult parseResult) {
this.close();
return exitCode;
}

private RunningApp<Runner> createRunningApp() {
final ExecutableApp<? extends Runner, ?, O> app = this.createExecutableApp(false);
final O executionOptions = this.createExecutionOptions();
final Runner runner = app.createRunner(executionOptions);
return new RunningApp<>(app, runner);
}

@RequiredArgsConstructor
private static class RunningApp<T extends Runner> implements AutoCloseable {
private final @NonNull ExecutableApp<? extends T, ?, ?> app;
private final @NonNull T runner;

@Override
public void close() {
this.runner.close();
// close app after runner because messages currently processed might depend on resources
this.app.close();
}

private void run() {
this.runner.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand All @@ -46,24 +45,14 @@
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Producer application")
public abstract class KafkaProducerApplication extends KafkaApplication {
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<ExecutableProducerApp<ProducerApp>> runningApps = new ConcurrentLinkedDeque<>();
public abstract class KafkaProducerApplication extends KafkaApplication<ProducerExecutionOptions> {

/**
* Run the application.
* @see ProducerRunner#run()
*/
@Override
public void run() {
try (final ExecutableProducerApp<ProducerApp> app = this.createExecutableApp()) {
this.runningApps.add(app);
final ProducerRunner runner = app.createRunner();
runner.run();
this.runningApps.remove(app);
}
super.run();
}

/**
Expand All @@ -72,36 +61,29 @@ public void run() {
@Command(description = "Delete all output topics associated with the Kafka Producer application.")
@Override
public void clean() {
try (final ExecutableProducerApp<ProducerApp> app = this.createExecutableApp()) {
final ProducerCleanUpRunner cleanUpRunner = app.createCleanUpRunner();
cleanUpRunner.clean();
}
super.clean();
}

/**
* @see #stop()
* Create a new {@code ProducerApp} that will be configured and executed according to this application.
* @return {@code ProducerApp}
*/
protected abstract ProducerApp createApp(boolean cleanUp);

@Override
public void close() {
super.close();
this.stop();
protected ExecutableProducerApp<ProducerApp> createExecutableApp(final boolean cleanUp) {
final ConfiguredProducerApp<ProducerApp> app = this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return app.withEndpoint(endpointConfig);
}

/**
* Stop all applications that have been started by {@link #run()}.
*/
public void stop() {
this.runningApps.forEach(ExecutableProducerApp::close);
@Override
protected ProducerExecutionOptions createExecutionOptions() {
return ProducerExecutionOptions.builder().build();
}

/**
* Create a new {@code ProducerApp} that will be configured and executed according to this application.
* @return {@code ProducerApp}
*/
protected abstract ProducerApp createApp();

private ConfiguredProducerApp<ProducerApp> createConfiguredApp() {
final ProducerApp producerApp = this.createApp();
private ConfiguredProducerApp<ProducerApp> createConfiguredApp(final boolean cleanUp) {
final ProducerApp producerApp = this.createApp(cleanUp);
final ProducerAppConfiguration configuration = this.createConfiguration();
return new ConfiguredProducerApp<>(producerApp, configuration);
}
Expand All @@ -121,10 +103,4 @@ private ProducerTopicConfig createTopicConfig() {
.extraOutputTopics(this.getExtraOutputTopics())
.build();
}

private ExecutableProducerApp<ProducerApp> createExecutableApp() {
final ConfiguredProducerApp<ProducerApp> app = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return app.withEndpoint(endpointConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
Expand Down Expand Up @@ -64,7 +62,7 @@
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Streams application.")
public abstract class KafkaStreamsApplication extends KafkaApplication {
public abstract class KafkaStreamsApplication extends KafkaApplication<StreamsExecutionOptions> {
@CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",")
private List<String> inputTopics = new ArrayList<>();
@CommandLine.Option(names = "--input-pattern", description = "Input pattern")
Expand All @@ -79,62 +77,33 @@ public abstract class KafkaStreamsApplication extends KafkaApplication {
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<RunningApp> runningApps = new ConcurrentLinkedDeque<>();

/**
* Run the application.
*
* @see StreamsRunner#run()
*/
@Override
public void run() {
try (final RunningApp runningApp = this.createRunningApp()) {
this.runningApps.add(runningApp);
runningApp.run();
this.runningApps.remove(runningApp);
}
}

/**
* Stop all applications that have been started by {@link #run()}.
*/
public final void stop() {
this.runningApps.forEach(RunningApp::close);
super.run();
}

/**
* Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
* topics associated with the Kafka Streams application.
*/
@Command(
description = "Reset the Kafka Streams application. Additionally, delete the consumer group and all "
+ "output and intermediate topics associated with the Kafka Streams application.")
@Command(description = "Reset the Kafka Streams application. Additionally, delete the consumer group and all "
+ "output and intermediate topics associated with the Kafka Streams application.")
@Override
public void clean() {
try (final ExecutableStreamsApp<StreamsApp> app = this.createExecutableApp(true)) {
final StreamsCleanUpRunner runner = app.createCleanUpRunner();
runner.clean();
}
}

/**
* @see #stop()
*/
@Override
public void close() {
super.close();
this.stop();
super.clean();
}

/**
* Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams
* application.
*/
@Command(
description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka Streams application.")
@Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka Streams application.")
public void reset() {
try (final ExecutableStreamsApp<StreamsApp> app = this.createExecutableApp(true)) {
final StreamsCleanUpRunner runner = app.createCleanUpRunner();
Expand Down Expand Up @@ -178,18 +147,8 @@ protected void onStreamsStart(final KafkaStreams streams) {
// do nothing by default
}

private StreamsRunner createRunner(final ExecutableStreamsApp<StreamsApp> app) {
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
return app.createRunner(executionOptions);
}

private RunningApp createRunningApp() {
final ExecutableStreamsApp<StreamsApp> app = this.createExecutableApp(false);
final StreamsRunner runner = this.createRunner(app);
return new RunningApp(app, runner);
}

private StreamsExecutionOptions createExecutionOptions() {
@Override
protected StreamsExecutionOptions createExecutionOptions() {
return StreamsExecutionOptions.builder()
.volatileGroupInstanceId(this.volatileGroupInstanceId)
.uncaughtExceptionHandler(this::createUncaughtExceptionHandler)
Expand All @@ -198,6 +157,13 @@ private StreamsExecutionOptions createExecutionOptions() {
.build();
}

@Override
protected ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean cleanUp) {
final ConfiguredStreamsApp<StreamsApp> configuredStreamsApp = this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}

private ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final StreamsApp streamsApp = this.createApp(cleanUp);
final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration();
Expand All @@ -224,27 +190,4 @@ private StreamsTopicConfig createTopicConfig() {
.errorTopic(this.errorTopic)
.build();
}

private ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean cleanUp) {
final ConfiguredStreamsApp<StreamsApp> configuredStreamsApp = this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}

@RequiredArgsConstructor
private static class RunningApp implements AutoCloseable {
private final @NonNull ExecutableStreamsApp<StreamsApp> app;
private final @NonNull StreamsRunner runner;

@Override
public void close() {
this.runner.close();
// close app after streams because messages currently processed might depend on resources
this.app.close();
}

private void run() {
this.runner.run();
}
}
}
Loading

0 comments on commit 49f12fd

Please sign in to comment.