diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index 1b0595c6..6c5a3590 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -24,7 +24,10 @@ package com.bakdata.kafka; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -37,15 +40,30 @@ public final class TestApplicationRunner { private final @NonNull String bootstrapServers; private final @NonNull SchemaRegistryEnv schemaRegistryEnv; - public Thread run(final KafkaStreamsApplication app) { - this.prepareExecution(app); - final Thread thread = new Thread(app); + private static Thread start(final Runnable runnable) { + final Thread thread = new Thread(runnable); final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); thread.setUncaughtExceptionHandler(handler); thread.start(); return thread; } + public Thread start(final KafkaStreamsApplication app, final String[] args) { + final Builder argBuilder = ImmutableList.builder() + .add(args) + .add("--bootstrap-servers", this.bootstrapServers); + if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { + argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); + } + final List newArgs = argBuilder.build(); + return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs.toArray(new String[0]))); + } + + public Thread run(final KafkaStreamsApplication app) { + this.prepareExecution(app); + return start(app); + } + public void clean(final KafkaStreamsApplication app) { this.prepareExecution(app); app.clean();