Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 23, 2025
1 parent 3433b25 commit 17ed85b
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@

package com.bakdata.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -37,15 +40,30 @@ public final class TestApplicationRunner {
private final @NonNull String bootstrapServers;
private final @NonNull SchemaRegistryEnv schemaRegistryEnv;

public Thread run(final KafkaStreamsApplication<? extends StreamsApp> app) {
this.prepareExecution(app);
final Thread thread = new Thread(app);
private static Thread start(final Runnable runnable) {
final Thread thread = new Thread(runnable);
final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
thread.setUncaughtExceptionHandler(handler);
thread.start();
return thread;
}

public Thread start(final KafkaStreamsApplication<? extends StreamsApp> app, final String[] args) {
final Builder<String> argBuilder = ImmutableList.<String>builder()
.add(args)
.add("--bootstrap-servers", this.bootstrapServers);
if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) {
argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl());
}
final List<String> newArgs = argBuilder.build();
return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs.toArray(new String[0])));
}

public Thread run(final KafkaStreamsApplication<? extends StreamsApp> app) {
this.prepareExecution(app);
return start(app);
}

public void clean(final KafkaStreamsApplication<? extends StreamsApp> app) {
this.prepareExecution(app);
app.clean();
Expand Down

0 comments on commit 17ed85b

Please sign in to comment.