diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java index efd5a8e3..9cac0b42 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -96,7 +96,7 @@ public static void startApplication(final KafkaApplication app, final String[] a public static int startApplicationWithoutExit(final KafkaApplication app, final String[] args) { final String[] populatedArgs = addEnvironmentVariablesArguments(args); final CommandLine commandLine = new CommandLine(app) - .setExecutionStrategy(app::executionStrategy); + .setExecutionStrategy(app::execute); return commandLine.execute(populatedArgs); } @@ -129,16 +129,17 @@ protected KafkaEndpointConfig getEndpointConfig() { .build(); } - private void configureCommand() { + private void startApplication() { log.info("Starting application"); + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); if (this.debug) { this.configureDebug(); } log.debug("Starting application: {}", this); } - private int executionStrategy(final ParseResult parseResult) { - this.configureCommand(); + private int execute(final ParseResult parseResult) { + this.startApplication(); final int exitCode = new CommandLine.RunLast().execute(parseResult); this.close(); return exitCode; diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index e9755366..f5654b73 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -25,6 +25,7 @@ package com.bakdata.kafka; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -48,18 +49,28 @@ @Slf4j @Command(description = "Run a Kafka Producer application") public abstract class KafkaProducerApplication extends KafkaApplication { + @ToString.Exclude + // ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus + // concurrently iterating on #runners and removing from #runners + private ConcurrentLinkedDeque> runningApps = new ConcurrentLinkedDeque<>(); @Override public void run() { - final ProducerRunner runner = this.createRunner(); - runner.run(); + try (final ExecutableProducerApp app = this.createExecutableApp()) { + this.runningApps.add(app); + final ProducerRunner runner = app.createRunner(); + runner.run(); + this.runningApps.remove(app); + } } @Command(description = "Delete all output topics associated with the Kafka Producer application.") @Override public void clean() { - final ProducerCleanUpRunner cleanUpRunner = this.createCleanUpRunner(); - cleanUpRunner.clean(); + try (final ExecutableProducerApp app = this.createExecutableApp()) { + final ProducerCleanUpRunner cleanUpRunner = app.createCleanUpRunner(); + cleanUpRunner.clean(); + } } public ConfiguredProducerApp createConfiguredApp() { @@ -86,14 +97,14 @@ public ProducerTopicConfig createTopicConfig() { public abstract ProducerApp createApp(); - public ProducerRunner createRunner() { - final ExecutableProducerApp executableApp = this.createExecutableApp(); - return executableApp.createRunner(); + @Override + public void close() { + super.close(); + this.stop(); } - public ProducerCleanUpRunner createCleanUpRunner() { - final ExecutableProducerApp executableApp = this.createExecutableApp(); - return executableApp.createCleanUpRunner(); + public void stop() { + this.runningApps.forEach(ExecutableProducerApp::close); } private ExecutableProducerApp createExecutableApp() { diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 6ad2f431..63d8818b 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.regex.Pattern; import lombok.Getter; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; @@ -79,7 +80,7 @@ public abstract class KafkaStreamsApplication extends KafkaApplication { @ToString.Exclude // ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus // concurrently iterating on #runners and removing from #runners - private ConcurrentLinkedDeque runners = new ConcurrentLinkedDeque<>(); + private ConcurrentLinkedDeque runningApps = new ConcurrentLinkedDeque<>(); /** * Run the application. If Kafka Streams is run, this method blocks until Kafka Streams has completed shutdown, @@ -87,21 +88,15 @@ public abstract class KafkaStreamsApplication extends KafkaApplication { */ @Override public void run() { - try (final StreamsRunner runner = this.createRunner()) { - this.runners.add(runner); - runner.run(); - this.runners.remove(runner); + try (final RunningApp runningApp = this.createRunningApp()) { + this.runningApps.add(runningApp); + runningApp.run(); + this.runningApps.remove(runningApp); } } - @Override - public void close() { - super.close(); - this.stop(); - } - public void stop() { - this.runners.forEach(StreamsRunner::close); + this.runningApps.forEach(RunningApp::close); } @Command( @@ -109,16 +104,38 @@ public void stop() { + "output and intermediate topics associated with the Kafka Streams application.") @Override public void clean() { - final StreamsCleanUpRunner runner = this.createCleanUpRunner(); - runner.clean(); + try (final ExecutableStreamsApp app = this.createExecutableApp(true)) { + final StreamsCleanUpRunner runner = app.createCleanUpRunner(); + runner.clean(); + } + } + + @Override + public void close() { + super.close(); + this.stop(); } @Command( description = "Clear all state stores, consumer group offsets, and internal topics associated with the " + "Kafka Streams application.") public void reset() { - final StreamsCleanUpRunner runner = this.createCleanUpRunner(); - runner.reset(); + try (final ExecutableStreamsApp app = this.createExecutableApp(true)) { + final StreamsCleanUpRunner runner = app.createCleanUpRunner(); + runner.reset(); + } + } + + public StreamsRunner createRunner(final ExecutableStreamsApp app) { + final StreamsExecutionOptions executionOptions = this.createExecutionOptions(); + final StreamsHooks hooks = this.createHooks(); + return app.createRunner(executionOptions, hooks); + } + + private RunningApp createRunningApp() { + final ExecutableStreamsApp app = this.createExecutableApp(false); + final StreamsRunner runner = this.createRunner(app); + return new RunningApp(app, runner); } public abstract StreamsApp createApp(boolean cleanUp); @@ -129,16 +146,21 @@ public StreamsExecutionOptions createExecutionOptions() { .build(); } - public StreamsRunner createRunner() { - final ExecutableStreamsApp executableStreamsApp = this.createExecutableApp(false); - final StreamsExecutionOptions executionOptions = this.createExecutionOptions(); - final StreamsHooks hooks = this.createHooks(); - return executableStreamsApp.createRunner(executionOptions, hooks); - } + @RequiredArgsConstructor + private static class RunningApp implements AutoCloseable { + private final @NonNull ExecutableStreamsApp app; + private final @NonNull StreamsRunner runner; - public StreamsCleanUpRunner createCleanUpRunner() { - final ExecutableStreamsApp executableApp = this.createExecutableApp(true); - return executableApp.createCleanUpRunner(); + @Override + public void close() { + this.runner.close(); + // close app after streams because messages currently processed might depend on resources + this.app.close(); + } + + private void run() { + this.runner.run(); + } } public ConfiguredStreamsApp createConfiguredApp(final boolean cleanUp) { diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/CapturingStreamsUncaughtExceptionHandler.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/CapturingStreamsUncaughtExceptionHandler.java index d0ef6473..93146792 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/CapturingStreamsUncaughtExceptionHandler.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/CapturingStreamsUncaughtExceptionHandler.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @RequiredArgsConstructor -public class CapturingStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler { +class CapturingStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler { private final @NonNull StreamsUncaughtExceptionHandler wrapped; private Throwable lastException; @@ -41,7 +41,7 @@ public StreamThreadExceptionResponse handle(final Throwable exception) { return response; } - public void throwException() { + void throwException() { if (this.lastException instanceof RuntimeException) { throw (RuntimeException) this.lastException; } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index eeee4ee1..732d0766 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -34,7 +34,7 @@ import org.apache.kafka.common.serialization.StringSerializer; @RequiredArgsConstructor -public class ConfiguredProducerApp { +public class ConfiguredProducerApp implements AutoCloseable { @Getter private final @NonNull T app; private final @NonNull ProducerAppConfiguration configuration; @@ -97,4 +97,8 @@ public ProducerTopicConfig getTopics() { return this.configuration.getTopics(); } + @Override + public void close() { + this.app.close(); + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 2345623e..6659eb9d 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.Topology; @RequiredArgsConstructor -public class ConfiguredStreamsApp { +public class ConfiguredStreamsApp implements AutoCloseable { @Getter private final @NonNull T app; private final @NonNull StreamsAppConfiguration configuration; @@ -120,4 +120,9 @@ public Topology createTopology(final Map kafkaProperties) { return topologyBuilder.build(); } + @Override + public void close() { + this.app.close(); + } + } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java index 6148c1af..b7fa29d9 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java @@ -31,7 +31,7 @@ @RequiredArgsConstructor @Getter -public class ExecutableProducerApp { +public class ExecutableProducerApp implements AutoCloseable { private final @NonNull ProducerTopicConfig topics; private final @NonNull Map kafkaProperties; private final @NonNull T app; @@ -48,4 +48,9 @@ public ProducerRunner createRunner() { .build(); return new ProducerRunner(() -> this.app.run(producerBuilder)); } + + @Override + public void close() { + this.app.close(); + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java index f17f0f65..b5037022 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java @@ -32,15 +32,15 @@ @RequiredArgsConstructor @Getter -public class ExecutableStreamsApp { +public class ExecutableStreamsApp implements AutoCloseable { private final @NonNull Topology topology; - private final @NonNull StreamsConfig kafkaProperties; + private final @NonNull StreamsConfig streamsConfig; private final @NonNull T app; public StreamsCleanUpRunner createCleanUpRunner() { final StreamsCleanUpConfigurer configurer = this.app.setupCleanUp(); - return StreamsCleanUpRunner.create(this.topology, this.kafkaProperties, configurer); + return StreamsCleanUpRunner.create(this.topology, this.streamsConfig, configurer); } public StreamsRunner createRunner() { @@ -58,10 +58,14 @@ public StreamsRunner createRunner(final StreamsHooks hooks) { public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions, final StreamsHooks hooks) { return StreamsRunner.builder() .topology(this.topology) - .config(this.kafkaProperties) + .config(this.streamsConfig) .executionOptions(executionOptions) .hooks(hooks) .build(); } + @Override + public void close() { + this.app.close(); + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java index 7efde9f2..b655a1b5 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -28,7 +28,8 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; -public interface ProducerApp { +@FunctionalInterface +public interface ProducerApp extends AutoCloseable { void run(ProducerBuilder builder); @@ -64,4 +65,9 @@ default Map createKafkaProperties() { default ProducerCleanUpConfigurer setupCleanUp() { return new ProducerCleanUpConfigurer(); } + + @Override + default void close() { + // do nothing by default + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java index 3a56eaa1..9679d72e 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.streams.StreamsConfig; -public interface StreamsApp { +public interface StreamsApp extends AutoCloseable { int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3; /** @@ -83,4 +83,9 @@ default Map createKafkaProperties(final StreamsOptions options) default StreamsCleanUpConfigurer setupCleanUp() { return new StreamsCleanUpConfigurer(); } + + @Override + default void close() { + // do nothing by default + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 2f07466a..61dad36d 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -68,8 +68,8 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology, } public static StreamsCleanUpRunner create(final @NonNull Topology topology, - final @NonNull StreamsConfig kafkaProperties, final @NonNull StreamsCleanUpConfigurer cleanHooks) { - final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(kafkaProperties); + final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfigurer cleanHooks) { + final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(streamsConfig); final TopologyInformation topologyInformation = new TopologyInformation(topology, streamsAppConfig.getAppId()); return new StreamsCleanUpRunner(topologyInformation, topology, streamsAppConfig, cleanHooks.create(streamsAppConfig.getKafkaProperties())); diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsRunner.java index 21043b6b..a120c8c7 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsRunner.java @@ -92,7 +92,6 @@ private void checkErrors() { private void runStreams() { log.info("Starting Kafka Streams"); this.streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); log.info("Calling start hook"); this.hooks.onStart(this.streams); }