Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 8, 2024
1 parent dbf7f18 commit a3078c3
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void deleted(final String topic) {
* Register a hook that cleans up LargeMessage files associated with a topic.
*
* @param cleanUpRunner {@code CleanUpRunner} to register hook on
* @return self for chaining
* @see #createLargeMessageCleanUpHook(Map)
*/
public static <T> T registerLargeMessageCleanUpHook(final HasTopicHooks<T> cleanUpRunner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,19 @@
* </ul>
* To implement your Kafka application inherit from this class and add your custom options. Run it by calling
* {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
*
* @param <R> type of {@link Runner} used by this app
* @param <C> type of {@link CleanUpRunner} used by this app
* @param <O> type of options to create runner
* @param <A> type of app
*/
@ToString
@Getter
@Setter
@RequiredArgsConstructor
@Slf4j
@Command(mixinStandardHelpOptions = true)
public abstract class KafkaApplication<R extends Runner, C extends CleanUpRunner, O>
public abstract class KafkaApplication<R extends Runner, C extends CleanUpRunner, O, A>
implements Runnable, AutoCloseable {
private static final String ENV_PREFIX = Optional.ofNullable(System.getenv("ENV_PREFIX")).orElse("APP_");
@CommandLine.Option(names = "--output-topic", description = "Output topic")
Expand All @@ -82,9 +87,9 @@ public abstract class KafkaApplication<R extends Runner, C extends CleanUpRunner
@CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties")
private Map<String, String> kafkaConfig = new HashMap<>();
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<RunningApp<Runner>> runningApps = new ConcurrentLinkedDeque<>();
// ConcurrentLinkedDeque required because calling #stop() causes asynchronous #run() calls to finish and thus
// concurrently iterating and removing from #runners
private ConcurrentLinkedDeque<Stoppable> activeApps = new ConcurrentLinkedDeque<>();

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
Expand All @@ -95,7 +100,7 @@ public abstract class KafkaApplication<R extends Runner, C extends CleanUpRunner
* @param args Arguments passed in by the custom application class.
* @see #startApplicationWithoutExit(KafkaApplication, String[])
*/
public static void startApplication(final KafkaApplication<?, ?, ?> app, final String[] args) {
public static void startApplication(final KafkaApplication<?, ?, ?, ?> app, final String[] args) {
final int exitCode = startApplicationWithoutExit(app, args);
System.exit(exitCode);
}
Expand All @@ -108,7 +113,7 @@ public static void startApplication(final KafkaApplication<?, ?, ?> app, final S
* @param args Arguments passed in by the custom application class.
* @return Exit code of application
*/
public static int startApplicationWithoutExit(final KafkaApplication<?, ?, ?> app, final String[] args) {
public static int startApplicationWithoutExit(final KafkaApplication<?, ?, ?, ?> app, final String[] args) {
final String[] populatedArgs = addEnvironmentVariablesArguments(args);
final CommandLine commandLine = new CommandLine(app)
.setExecutionStrategy(app::execute);
Expand All @@ -129,8 +134,8 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) {
* Clean all resources associated with this application
*/
public void clean() {
try (final ExecutableApp<?, ? extends CleanUpRunner, ?> app = this.createExecutableApp(true)) {
final CleanUpRunner cleanUpRunner = app.createCleanUpRunner();
try (final CleanableApp cleanableApp = this.createCleanableApp()) {
final C cleanUpRunner = cleanableApp.getCleanUpRunner();
cleanUpRunner.clean();
}
}
Expand All @@ -144,21 +149,20 @@ public void close() {
}

/**
* Stop all applications that have been started by {@link #run()}.
* Stop all applications that have been started asynchronously, e.g., by using {@link #run()} or {@link #clean()}.
*/
public final void stop() {
this.runningApps.forEach(RunningApp::close);
this.activeApps.forEach(Stoppable::stop);
}

/**
* Run the application.
*/
@Override
public void run() {
try (final RunningApp<Runner> runningApp = this.createRunningApp()) {
this.runningApps.add(runningApp);
runningApp.run();
this.runningApps.remove(runningApp);
try (final RunnableApp runnableApp = this.createRunnableApp()) {
final R runner = runnableApp.getRunner();
runner.run();
}
}

Expand All @@ -171,13 +175,59 @@ protected void configureDebug() {
Configurator.setLevel(this.getClass().getPackageName(), Level.DEBUG);
}

/**
* Create configuration to configure app
* @return configuration
*/
abstract Configuration<A, ? extends ConfiguredApp<? extends ExecutableApp<R, C, O>>> createConfiguration();

/**
* Create a new app that will be configured and executed according to this application.
* @param cleanUp whether app is created for clean up purposes. In that case, the user might want
* to skip initialization of expensive resources.
* @return app
*/
abstract A createApp(boolean cleanUp);

/**
* Create options for running the app
* @return run options if available
* @see ExecutableApp#createRunner(Object)
*/
abstract Optional<O> createExecutionOptions();

/**
* Create a new {@code CleanableApp}
* @return {@code CleanableApp}
*/
final CleanableApp createCleanableApp() {
final ExecutableApp<R, C, O> executableApp = this.createExecutableApp(true);
final C cleanUpRunner = executableApp.createCleanUpRunner();
final CleanableApp cleanableApp = new CleanableApp(executableApp, cleanUpRunner);
this.activeApps.add(cleanableApp);
return cleanableApp;
}

/**
* Create a new {@code RunnableApp}
* @return {@code RunnableApp}
*/
final RunnableApp createRunnableApp() {
final ExecutableApp<R, ?, O> app = this.createExecutableApp(false);
final Optional<O> executionOptions = this.createExecutionOptions();
final R runner = executionOptions.map(app::createRunner).orElseGet(app::createRunner);
final RunnableApp runnableApp = new RunnableApp(app, runner);
this.activeApps.add(runnableApp);
return runnableApp;
}

/**
* Create a new {@code ExecutableApp} that will be executed according to the requested command.
* @param cleanUp whether app is created for clean up purposes. In that case, the user might want to skip
* initialization of expensive resources.
* @return {@code ExecutableApp}
*/
final ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
private ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
final ConfiguredApp<? extends ExecutableApp<R, C, O>> configuredStreamsApp =
this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
Expand All @@ -190,14 +240,12 @@ final ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
* to skip initialization of expensive resources.
* @return {@code ConfiguredApp}
*/
abstract ConfiguredApp<? extends ExecutableApp<R, C, O>> createConfiguredApp(boolean cleanUp);

/**
* Create options for running the app
* @return run options
* @see ExecutableApp#createRunner(Object)
*/
abstract O createExecutionOptions();
private ConfiguredApp<? extends ExecutableApp<R, C, O>> createConfiguredApp(final boolean cleanUp) {
final A app = this.createApp(cleanUp);
final Configuration<A, ? extends ConfiguredApp<? extends ExecutableApp<R, C, O>>> configuration =
this.createConfiguration();
return configuration.configure(app);
}

private KafkaEndpointConfig getEndpointConfig() {
return KafkaEndpointConfig.builder()
Expand All @@ -222,27 +270,46 @@ private int execute(final ParseResult parseResult) {
return exitCode;
}

private RunningApp<Runner> createRunningApp() {
final ExecutableApp<R, ?, O> app = this.createExecutableApp(false);
final O executionOptions = this.createExecutionOptions();
final Runner runner = app.createRunner(executionOptions);
return new RunningApp<>(app, runner);
@FunctionalInterface
private interface Stoppable {
void stop();
}

@RequiredArgsConstructor
private static class RunningApp<T extends Runner> implements AutoCloseable {
class CleanableApp implements AutoCloseable, Stoppable {
private final @NonNull ExecutableApp<?, ?, ?> app;
private final @NonNull T runner;
@Getter
private final @NonNull C cleanUpRunner;

@Override
public void close() {
this.runner.close();
// close app after runner because messages currently processed might depend on resources
this.stop();
KafkaApplication.this.activeApps.remove(this);
}

@Override
public void stop() {
this.app.close();
}
}

private void run() {
this.runner.run();
@RequiredArgsConstructor
class RunnableApp implements AutoCloseable, Stoppable {
private final @NonNull ExecutableApp<?, ?, ?> app;
@Getter
private final @NonNull R runner;

@Override
public void close() {
this.stop();
KafkaApplication.this.activeApps.remove(this);
}

@Override
public void stop() {
this.runner.close();
// close app after runner because messages currently processed might depend on resources
this.app.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.bakdata.kafka;

import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand All @@ -46,15 +47,7 @@
@Slf4j
@Command(description = "Run a Kafka Producer application")
public abstract class KafkaProducerApplication
extends KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions> {

/**
* @see ProducerRunner#run()
*/
@Override
public void run() {
super.run();
}
extends KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions, ProducerApp> {

/**
* Delete all output topics associated with the Kafka Producer application.
Expand All @@ -66,24 +59,21 @@ public void clean() {
}

/**
* Create a new {@code ProducerApp} that will be configured and executed according to this application.
* @return {@code ProducerApp}
* @see ProducerRunner#run()
*/
protected abstract ProducerApp createApp(boolean cleanUp);

@Override
final ProducerExecutionOptions createExecutionOptions() {
return ProducerExecutionOptions.builder().build();
public void run() {
super.run();
}

/**
* Create a new {@code ProducerApp} that will be configured and executed according to this application.
*/
@Override
final ConfiguredProducerApp<ProducerApp> createConfiguredApp(final boolean cleanUp) {
final ProducerApp producerApp = this.createApp(cleanUp);
final ProducerAppConfiguration configuration = this.createConfiguration();
return new ConfiguredProducerApp<>(producerApp, configuration);
}
protected abstract ProducerApp createApp(boolean cleanUp);

private ProducerAppConfiguration createConfiguration() {
@Override
final ProducerAppConfiguration createConfiguration() {
final ProducerTopicConfig topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
return ProducerAppConfiguration.builder()
Expand All @@ -92,6 +82,11 @@ private ProducerAppConfiguration createConfiguration() {
.build();
}

@Override
final Optional<ProducerExecutionOptions> createExecutionOptions() {
return Optional.empty();
}

private ProducerTopicConfig createTopicConfig() {
return ProducerTopicConfig.builder()
.outputTopic(this.getOutputTopic())
Expand Down
Loading

0 comments on commit a3078c3

Please sign in to comment.