-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add --unique-app-id parameter #197
Closed
Closed
Changes from 5 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
3865e13
Add --application-id parameter
philipp94831 eafd69e
Add --application-id parameter
philipp94831 5fab21b
Add --application-id parameter
philipp94831 580fbb7
Add --application-id parameter
philipp94831 3d572de
Add --application-id parameter
philipp94831 615d351
Add --application-id parameter
philipp94831 1dbec03
Add --application-id parameter
philipp94831 32634a9
Add --application-id parameter
philipp94831 ec6d53d
Add --application-id parameter
philipp94831 6a4cc96
Add --application-id parameter
philipp94831 3d13af0
Add --application-id parameter
philipp94831 d78a1f6
Merge remote-tracking branch 'origin/master' into feature/application…
philipp94831 31530ac
Update
philipp94831 981ab00
Merge remote-tracking branch 'origin/master' into feature/application…
philipp94831 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2023 bakdata | ||
* Copyright (c) 2024 bakdata | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
|
@@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement | |
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", | ||
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") | ||
private boolean volatileGroupInstanceId; | ||
@CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams") | ||
private String uniqueAppId; | ||
private KafkaStreams streams; | ||
private Throwable lastException; | ||
|
||
|
@@ -155,12 +157,6 @@ public void close() { | |
*/ | ||
public abstract void buildTopology(StreamsBuilder builder); | ||
|
||
/** | ||
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal | ||
* state encapsulation. Could be set to: className-inputTopic-outputTopic | ||
*/ | ||
public abstract String getUniqueAppId(); | ||
|
||
/** | ||
* Create the topology of the Kafka Streams app | ||
* | ||
|
@@ -271,13 +267,31 @@ protected Properties createKafkaProperties() { | |
kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); | ||
|
||
// topology | ||
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId()); | ||
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId()); | ||
|
||
this.configureDefaultSerde(kafkaConfig); | ||
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers()); | ||
return kafkaConfig; | ||
} | ||
|
||
private String getApplicationId() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And I think we need a test for this method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will add tests, just wanted to have your thoughts first |
||
final String uniqueAppId = this.getUniqueAppId(); | ||
raminqaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (uniqueAppId == null) { | ||
if (this.uniqueAppId == null) { | ||
throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); | ||
raminqaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return this.uniqueAppId; | ||
} | ||
if (this.uniqueAppId == null) { | ||
return uniqueAppId; | ||
} | ||
if (!uniqueAppId.equals(this.uniqueAppId)) { | ||
throw new IllegalArgumentException( | ||
"Application ID provided via --application-id does not match #getUniqueAppId()"); | ||
} | ||
return uniqueAppId; | ||
} | ||
|
||
/** | ||
* Run the Streams application. This method blocks until Kafka Streams has completed shutdown, either because it | ||
* caught an error or the application has received a shutdown event. | ||
|
@@ -322,7 +336,7 @@ protected void runCleanUp() { | |
try (final ImprovedAdminClient adminClient = this.createAdminClient()) { | ||
final CleanUpRunner cleanUpRunner = CleanUpRunner.builder() | ||
.topology(this.createTopology()) | ||
.appId(this.getUniqueAppId()) | ||
.appId(this.getApplicationId()) | ||
.adminClient(adminClient) | ||
.streams(this.streams) | ||
.build(); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it would clutter the API even more and lead to confusion together with
getUniqueAppId()
, but I feel like we will need to call it elsewhereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this method does the validation whenever Kafka properties are created. So getUniqueAppId() should still return the correct result unless a user explicitly overrides the method and it returns null. I don't think this will ever happen