Skip to content

Commit

Permalink
Add hook to prepare running of app
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Aug 20, 2024
1 parent e1d8b61 commit 9145b0c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) {
* Clean all resources associated with this application
*/
public void clean() {
this.prepareClean();
try (final CleanableApp<CR> cleanableApp = this.createCleanableApp()) {
final CR cleanUpRunner = cleanableApp.getCleanUpRunner();
cleanUpRunner.clean();
Expand Down Expand Up @@ -279,6 +280,13 @@ protected void prepareRun() {
// do nothing by default
}

/**
* Called before cleaning the application, i.e., invoking {@link #clean()}
*/
protected void prepareClean() {
// do nothing by default
}

private void startApplication() {
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
this.onApplicationStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void clean() {
@Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka Streams application.")
public void reset() {
this.prepareClean();
try (final CleanableApp<StreamsCleanUpRunner> app = this.createCleanableApp()) {
final StreamsCleanUpRunner runner = app.getCleanUpRunner();
runner.reset();
Expand Down Expand Up @@ -147,6 +148,14 @@ public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final StreamsA
return configuredApp;
}

/**
* Called before cleaning the application, i.e., invoking {@link #clean()} or {@link #reset()}
*/
@Override
protected void prepareClean() {
super.prepareClean();
}

/**
* Create a {@link StateListener} to use for Kafka Streams.
*
Expand Down

0 comments on commit 9145b0c

Please sign in to comment.