From 9145b0c101c21b682dc1940e1ecc7b869fb42d44 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 20 Aug 2024 09:03:27 +0200 Subject: [PATCH] Add hook to prepare running of app --- .../main/java/com/bakdata/kafka/KafkaApplication.java | 8 ++++++++ .../java/com/bakdata/kafka/KafkaStreamsApplication.java | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 0b201f12..bac2a236 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -160,6 +160,7 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) { * Clean all resources associated with this application */ public void clean() { + this.prepareClean(); try (final CleanableApp cleanableApp = this.createCleanableApp()) { final CR cleanUpRunner = cleanableApp.getCleanUpRunner(); cleanUpRunner.clean(); @@ -279,6 +280,13 @@ protected void prepareRun() { // do nothing by default } + /** + * Called before cleaning the application, i.e., invoking {@link #clean()} + */ + protected void prepareClean() { + // do nothing by default + } + private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); this.onApplicationStart(); diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index f0281306..3095b468 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -106,6 +106,7 @@ public void clean() { @Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the " + "Kafka Streams application.") public void reset() { + this.prepareClean(); try (final CleanableApp app = this.createCleanableApp()) { final StreamsCleanUpRunner runner = app.getCleanUpRunner(); runner.reset(); @@ -147,6 +148,14 @@ public final ConfiguredStreamsApp createConfiguredApp(final StreamsA return configuredApp; } + /** + * Called before cleaning the application, i.e., invoking {@link #clean()} or {@link #reset()} + */ + @Override + protected void prepareClean() { + super.prepareClean(); + } + /** * Create a {@link StateListener} to use for Kafka Streams. *