Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent e6a1581 commit ff7a2a0
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void close() {
/**
* Stop all applications that have been started by {@link #run()}.
*/
public void stop() {
public final void stop() {
this.runningApps.forEach(RunningApp::close);
}

Expand Down Expand Up @@ -177,7 +177,7 @@ protected void configureDebug() {
* initialization of expensive resources.
* @return {@code ExecutableApp}
*/
protected ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
final ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
final ConfiguredApp<? extends ExecutableApp<R, C, O>> configuredStreamsApp =
this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
Expand All @@ -190,14 +190,14 @@ protected ExecutableApp<R, C, O> createExecutableApp(final boolean cleanUp) {
* to skip initialization of expensive resources.
* @return {@code ConfiguredApp}
*/
protected abstract ConfiguredApp<? extends ExecutableApp<R, C, O>> createConfiguredApp(boolean cleanUp);
abstract ConfiguredApp<? extends ExecutableApp<R, C, O>> createConfiguredApp(boolean cleanUp);

/**
* Create options for running the app
* @return run options
* @see ExecutableApp#createRunner(Object)
*/
protected abstract O createExecutionOptions();
abstract O createExecutionOptions();

private KafkaEndpointConfig getEndpointConfig() {
return KafkaEndpointConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public void clean() {
protected abstract ProducerApp createApp(boolean cleanUp);

@Override
protected ProducerExecutionOptions createExecutionOptions() {
final ProducerExecutionOptions createExecutionOptions() {
return ProducerExecutionOptions.builder().build();
}

@Override
protected ConfiguredProducerApp<ProducerApp> createConfiguredApp(final boolean cleanUp) {
final ConfiguredProducerApp<ProducerApp> createConfiguredApp(final boolean cleanUp) {
final ProducerApp producerApp = this.createApp(cleanUp);
final ProducerAppConfiguration configuration = this.createConfiguration();
return new ConfiguredProducerApp<>(producerApp, configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public abstract class KafkaStreamsApplication
private boolean volatileGroupInstanceId;

/**
*
* @see StreamsRunner#run()
*/
@Override
Expand Down Expand Up @@ -149,7 +148,7 @@ protected void onStreamsStart(final KafkaStreams streams) {
}

@Override
protected StreamsExecutionOptions createExecutionOptions() {
final StreamsExecutionOptions createExecutionOptions() {
return StreamsExecutionOptions.builder()
.volatileGroupInstanceId(this.volatileGroupInstanceId)
.uncaughtExceptionHandler(this::createUncaughtExceptionHandler)
Expand All @@ -159,7 +158,7 @@ protected StreamsExecutionOptions createExecutionOptions() {
}

@Override
protected ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final StreamsApp streamsApp = this.createApp(cleanUp);
final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration();
return new ConfiguredStreamsApp<>(streamsApp, streamsAppConfiguration);
Expand Down

0 comments on commit ff7a2a0

Please sign in to comment.