Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent ebafc02 commit ee6e226
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
@RequiredArgsConstructor
@Slf4j
@Command(mixinStandardHelpOptions = true)
public abstract class KafkaApplication<C extends CleanUpRunner, O> implements Runnable, AutoCloseable {
public abstract class KafkaApplication<R extends Runner, C extends CleanUpRunner, O>
implements Runnable, AutoCloseable {
private static final String ENV_PREFIX = Optional.ofNullable(System.getenv("ENV_PREFIX")).orElse("APP_");
@CommandLine.Option(names = "--output-topic", description = "Output topic")
private String outputTopic;
Expand Down Expand Up @@ -94,7 +95,7 @@ public abstract class KafkaApplication<C extends CleanUpRunner, O> implements Ru
* @param args Arguments passed in by the custom application class.
* @see #startApplicationWithoutExit(KafkaApplication, String[])
*/
public static void startApplication(final KafkaApplication<?, ?> app, final String[] args) {
public static void startApplication(final KafkaApplication<?, ?, ?> app, final String[] args) {
final int exitCode = startApplicationWithoutExit(app, args);
System.exit(exitCode);
}
Expand All @@ -107,7 +108,7 @@ public static void startApplication(final KafkaApplication<?, ?> app, final Stri
* @param args Arguments passed in by the custom application class.
* @return Exit code of application
*/
public static int startApplicationWithoutExit(final KafkaApplication<?, ?> app, final String[] args) {
public static int startApplicationWithoutExit(final KafkaApplication<?, ?, ?> app, final String[] args) {
final String[] populatedArgs = addEnvironmentVariablesArguments(args);
final CommandLine commandLine = new CommandLine(app)
.setExecutionStrategy(app::execute);
Expand Down Expand Up @@ -163,16 +164,32 @@ protected void configureDebug() {
Configurator.setLevel(this.getClass().getPackageName(), Level.DEBUG);
}

protected ExecutableApp<? extends Runner, C, O> createExecutableApp(final boolean cleanUp) {
final ConfiguredApp<? extends ExecutableApp<? extends Runner, C, O>> configuredStreamsApp =
/**
* Create a new {@code ExecutableApp} that will be executed according to the requested command.
* @param cleanUp whether app is created for clean up purposes. In that case, the user might want to skip
* initialization of expensive resources.
* @return {@code ExecutableApp}
*/
protected ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
final ConfiguredApp<? extends ExecutableApp<R, C, O>> configuredStreamsApp =
this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}

protected abstract ConfiguredApp<? extends ExecutableApp<? extends Runner, C, O>> createConfiguredApp(
boolean cleanUp);
/**
* Create a new {@code ConfiguredApp} that will be executed according to this application.
* @param cleanUp whether {@code ConfiguredApp} is created for clean up purposes. In that case, the user might want
* to skip initialization of expensive resources.
* @return {@code ConfiguredApp}
*/
protected abstract ConfiguredApp<? extends ExecutableApp<R, C, O>> createConfiguredApp(boolean cleanUp);

/**
* Create options for running the app
* @return run options
* @see ExecutableApp#createRunner(Object)
*/
protected abstract O createExecutionOptions();

private KafkaEndpointConfig getEndpointConfig() {
Expand All @@ -199,15 +216,15 @@ private int execute(final ParseResult parseResult) {
}

private RunningApp<Runner> createRunningApp() {
final ExecutableApp<? extends Runner, ?, O> app = this.createExecutableApp(false);
final ExecutableApp<?, ?, O> app = this.createExecutableApp(false);
final O executionOptions = this.createExecutionOptions();
final Runner runner = app.createRunner(executionOptions);
return new RunningApp<>(app, runner);
}

@RequiredArgsConstructor
private static class RunningApp<T extends Runner> implements AutoCloseable {
private final @NonNull ExecutableApp<? extends T, ?, ?> app;
private final @NonNull ExecutableApp<?, ?, ?> app;
private final @NonNull T runner;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
@Slf4j
@Command(description = "Run a Kafka Producer application")
public abstract class KafkaProducerApplication
extends KafkaApplication<ProducerCleanUpRunner, ProducerExecutionOptions> {
extends KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions> {

/**
* @see ProducerRunner#run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Streams application.")
public abstract class KafkaStreamsApplication extends KafkaApplication<StreamsCleanUpRunner, StreamsExecutionOptions> {
public abstract class KafkaStreamsApplication
extends KafkaApplication<StreamsRunner, StreamsCleanUpRunner, StreamsExecutionOptions> {
@CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",")
private List<String> inputTopics = new ArrayList<>();
@CommandLine.Option(names = "--input-pattern", description = "Input pattern")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@

package com.bakdata.kafka;

/**
* Cleans all resources associated with an application
*/
@FunctionalInterface
public interface CleanUpRunner {
/**
* Clean all resources associated with an application
*/
void clean();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@

package com.bakdata.kafka;

/**
* An application with a corresponding configuration
*
* @param <E> type of executable app after configuring {@link KafkaEndpointConfig}
*/
public interface ConfiguredApp<E> extends AutoCloseable {
/**
* Create an executable app using the provided {@code KafkaEndpointConfig}
* @param endpointConfig endpoint to run app on
* @return executable streams app
*/
E withEndpoint(KafkaEndpointConfig endpointConfig);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpoint

/**
* Create an {@code ExecutableProducerApp} using the provided {@code KafkaEndpointConfig}
* @param endpointConfig endpoint to run app on
* @return {@code ExecutableProducerApp}
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ public StreamsTopicConfig getTopics() {
}

/**
* Create an {@code ExecutableProducerApp} using the provided {@code KafkaEndpointConfig}
* @param endpointConfig endpoint to run app on
* @return {@code ExecutableProducerApp}
* Create an {@code ExecutableStreamsApp} using the provided {@code KafkaEndpointConfig}
* @return {@code ExecutableStreamsApp}
*/
@Override
public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,33 @@

package com.bakdata.kafka;

/**
* An application with a corresponding topic and Kafka configuration
* @param <T> type of {@link Runner}
* @param <C> type of {@link CleanUpRunner}
* @param <O> type of options to create {@link Runner}
*/
public interface ExecutableApp<T extends Runner, C extends CleanUpRunner, O> extends AutoCloseable {

@Override
void close();

/**
* Create {@code Runner} in order to run application with default options
* @return {@code Runner}
*/
T createRunner();

/**
* Create {@code Runner} in order to run application
* @param options options for creating runner
* @return {@code Runner}
*/
T createRunner(O options);

/**
* Create {@code CleanUpRunner} in order to clean application
* @return {@code CleanUpRunner}
*/
C createCleanUpRunner();
}

0 comments on commit ee6e226

Please sign in to comment.