Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 4, 2024
1 parent 746db7f commit f374ffc
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static void startApplication(final KafkaApplication app, final String[] a
public static int startApplicationWithoutExit(final KafkaApplication app, final String[] args) {
final String[] populatedArgs = addEnvironmentVariablesArguments(args);
final CommandLine commandLine = new CommandLine(app)
.setExecutionStrategy(app::executionStrategy);
.setExecutionStrategy(app::execute);
return commandLine.execute(populatedArgs);
}

Expand Down Expand Up @@ -129,16 +129,17 @@ protected KafkaEndpointConfig getEndpointConfig() {
.build();
}

private void configureCommand() {
private void startApplication() {
log.info("Starting application");
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
if (this.debug) {
this.configureDebug();
}
log.debug("Starting application: {}", this);
}

private int executionStrategy(final ParseResult parseResult) {
this.configureCommand();
private int execute(final ParseResult parseResult) {
this.startApplication();
final int exitCode = new CommandLine.RunLast().execute(parseResult);
this.close();
return exitCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.bakdata.kafka;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand All @@ -48,18 +49,28 @@
@Slf4j
@Command(description = "Run a Kafka Producer application")
public abstract class KafkaProducerApplication extends KafkaApplication {
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<ExecutableProducerApp<ProducerApp>> runningApps = new ConcurrentLinkedDeque<>();

@Override
public void run() {
final ProducerRunner runner = this.createRunner();
runner.run();
try (final ExecutableProducerApp<ProducerApp> app = this.createExecutableApp()) {
this.runningApps.add(app);
final ProducerRunner runner = app.createRunner();
runner.run();
this.runningApps.remove(app);
}
}

@Command(description = "Delete all output topics associated with the Kafka Producer application.")
@Override
public void clean() {
final ProducerCleanUpRunner cleanUpRunner = this.createCleanUpRunner();
cleanUpRunner.clean();
try (final ExecutableProducerApp<ProducerApp> app = this.createExecutableApp()) {
final ProducerCleanUpRunner cleanUpRunner = app.createCleanUpRunner();
cleanUpRunner.clean();
}
}

public ConfiguredProducerApp<ProducerApp> createConfiguredApp() {
Expand All @@ -86,14 +97,14 @@ public ProducerTopicConfig createTopicConfig() {

public abstract ProducerApp createApp();

public ProducerRunner createRunner() {
final ExecutableProducerApp<ProducerApp> executableApp = this.createExecutableApp();
return executableApp.createRunner();
@Override
public void close() {
super.close();
this.stop();
}

public ProducerCleanUpRunner createCleanUpRunner() {
final ExecutableProducerApp<ProducerApp> executableApp = this.createExecutableApp();
return executableApp.createCleanUpRunner();
public void stop() {
this.runningApps.forEach(ExecutableProducerApp::close);
}

private ExecutableProducerApp<ProducerApp> createExecutableApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
Expand Down Expand Up @@ -79,46 +80,62 @@ public abstract class KafkaStreamsApplication extends KafkaApplication {
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<StreamsRunner> runners = new ConcurrentLinkedDeque<>();
private ConcurrentLinkedDeque<RunningApp> runningApps = new ConcurrentLinkedDeque<>();

/**
* Run the application. If Kafka Streams is run, this method blocks until Kafka Streams has completed shutdown,
* either because it caught an error or the application has received a shutdown event.
*/
@Override
public void run() {
try (final StreamsRunner runner = this.createRunner()) {
this.runners.add(runner);
runner.run();
this.runners.remove(runner);
try (final RunningApp runningApp = this.createRunningApp()) {
this.runningApps.add(runningApp);
runningApp.run();
this.runningApps.remove(runningApp);
}
}

@Override
public void close() {
super.close();
this.stop();
}

public void stop() {
this.runners.forEach(StreamsRunner::close);
this.runningApps.forEach(RunningApp::close);
}

@Command(
description = "Reset the Kafka Streams application. Additionally, delete the consumer group and all "
+ "output and intermediate topics associated with the Kafka Streams application.")
@Override
public void clean() {
final StreamsCleanUpRunner runner = this.createCleanUpRunner();
runner.clean();
try (final ExecutableStreamsApp<StreamsApp> app = this.createExecutableApp(true)) {
final StreamsCleanUpRunner runner = app.createCleanUpRunner();
runner.clean();
}
}

@Override
public void close() {
super.close();
this.stop();
}

@Command(
description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka Streams application.")
public void reset() {
final StreamsCleanUpRunner runner = this.createCleanUpRunner();
runner.reset();
try (final ExecutableStreamsApp<StreamsApp> app = this.createExecutableApp(true)) {
final StreamsCleanUpRunner runner = app.createCleanUpRunner();
runner.reset();
}
}

public StreamsRunner createRunner(final ExecutableStreamsApp<StreamsApp> app) {
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
final StreamsHooks hooks = this.createHooks();
return app.createRunner(executionOptions, hooks);
}

private RunningApp createRunningApp() {
final ExecutableStreamsApp<StreamsApp> app = this.createExecutableApp(false);
final StreamsRunner runner = this.createRunner(app);
return new RunningApp(app, runner);
}

public abstract StreamsApp createApp(boolean cleanUp);
Expand All @@ -129,16 +146,21 @@ public StreamsExecutionOptions createExecutionOptions() {
.build();
}

public StreamsRunner createRunner() {
final ExecutableStreamsApp<StreamsApp> executableStreamsApp = this.createExecutableApp(false);
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
final StreamsHooks hooks = this.createHooks();
return executableStreamsApp.createRunner(executionOptions, hooks);
}
@RequiredArgsConstructor
private static class RunningApp implements AutoCloseable {
private final @NonNull ExecutableStreamsApp<StreamsApp> app;
private final @NonNull StreamsRunner runner;

public StreamsCleanUpRunner createCleanUpRunner() {
final ExecutableStreamsApp<StreamsApp> executableApp = this.createExecutableApp(true);
return executableApp.createCleanUpRunner();
@Override
public void close() {
this.runner.close();
// close app after streams because messages currently processed might depend on resources
this.app.close();
}

private void run() {
this.runner.run();
}
}

public ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

@RequiredArgsConstructor
public class CapturingStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
class CapturingStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

private final @NonNull StreamsUncaughtExceptionHandler wrapped;
private Throwable lastException;
Expand All @@ -41,7 +41,7 @@ public StreamThreadExceptionResponse handle(final Throwable exception) {
return response;
}

public void throwException() {
void throwException() {
if (this.lastException instanceof RuntimeException) {
throw (RuntimeException) this.lastException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.common.serialization.StringSerializer;

@RequiredArgsConstructor
public class ConfiguredProducerApp<T extends ProducerApp> {
public class ConfiguredProducerApp<T extends ProducerApp> implements AutoCloseable {
@Getter
private final @NonNull T app;
private final @NonNull ProducerAppConfiguration configuration;
Expand Down Expand Up @@ -97,4 +97,8 @@ public ProducerTopicConfig getTopics() {
return this.configuration.getTopics();
}

@Override
public void close() {
this.app.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.kafka.streams.Topology;

@RequiredArgsConstructor
public class ConfiguredStreamsApp<T extends StreamsApp> {
public class ConfiguredStreamsApp<T extends StreamsApp> implements AutoCloseable {
@Getter
private final @NonNull T app;
private final @NonNull StreamsAppConfiguration configuration;
Expand Down Expand Up @@ -120,4 +120,9 @@ public Topology createTopology(final Map<String, Object> kafkaProperties) {
return topologyBuilder.build();
}

@Override
public void close() {
this.app.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

@RequiredArgsConstructor
@Getter
public class ExecutableProducerApp<T extends ProducerApp> {
public class ExecutableProducerApp<T extends ProducerApp> implements AutoCloseable {
private final @NonNull ProducerTopicConfig topics;
private final @NonNull Map<String, Object> kafkaProperties;
private final @NonNull T app;
Expand All @@ -48,4 +48,9 @@ public ProducerRunner createRunner() {
.build();
return new ProducerRunner(() -> this.app.run(producerBuilder));
}

@Override
public void close() {
this.app.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@

@RequiredArgsConstructor
@Getter
public class ExecutableStreamsApp<T extends StreamsApp> {
public class ExecutableStreamsApp<T extends StreamsApp> implements AutoCloseable {

private final @NonNull Topology topology;
private final @NonNull StreamsConfig kafkaProperties;
private final @NonNull StreamsConfig streamsConfig;
private final @NonNull T app;

public StreamsCleanUpRunner createCleanUpRunner() {
final StreamsCleanUpConfigurer configurer = this.app.setupCleanUp();
return StreamsCleanUpRunner.create(this.topology, this.kafkaProperties, configurer);
return StreamsCleanUpRunner.create(this.topology, this.streamsConfig, configurer);
}

public StreamsRunner createRunner() {
Expand All @@ -58,10 +58,14 @@ public StreamsRunner createRunner(final StreamsHooks hooks) {
public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions, final StreamsHooks hooks) {
return StreamsRunner.builder()
.topology(this.topology)
.config(this.kafkaProperties)
.config(this.streamsConfig)
.executionOptions(executionOptions)
.hooks(hooks)
.build();
}

@Override
public void close() {
this.app.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;

public interface ProducerApp {
@FunctionalInterface
public interface ProducerApp extends AutoCloseable {

void run(ProducerBuilder builder);

Expand Down Expand Up @@ -64,4 +65,9 @@ default Map<String, Object> createKafkaProperties() {
default ProducerCleanUpConfigurer setupCleanUp() {
return new ProducerCleanUpConfigurer();
}

@Override
default void close() {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.StreamsConfig;

public interface StreamsApp {
public interface StreamsApp extends AutoCloseable {
int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3;

/**
Expand Down Expand Up @@ -83,4 +83,9 @@ default Map<String, Object> createKafkaProperties(final StreamsOptions options)
default StreamsCleanUpConfigurer setupCleanUp() {
return new StreamsCleanUpConfigurer();
}

@Override
default void close() {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology,
}

public static StreamsCleanUpRunner create(final @NonNull Topology topology,
final @NonNull StreamsConfig kafkaProperties, final @NonNull StreamsCleanUpConfigurer cleanHooks) {
final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(kafkaProperties);
final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfigurer cleanHooks) {
final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(streamsConfig);
final TopologyInformation topologyInformation = new TopologyInformation(topology, streamsAppConfig.getAppId());
return new StreamsCleanUpRunner(topologyInformation, topology, streamsAppConfig,
cleanHooks.create(streamsAppConfig.getKafkaProperties()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ private void checkErrors() {
private void runStreams() {
log.info("Starting Kafka Streams");
this.streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
log.info("Calling start hook");
this.hooks.onStart(this.streams);
}
Expand Down

0 comments on commit f374ffc

Please sign in to comment.